Skip to main content

Overview

Flow Core provides comprehensive resilience patterns to handle failures gracefully, including retry logic, circuit breakers, fallback strategies, and error recovery mechanisms.

Retry Patterns

Basic Retry

from nadoo_flow import RetryNode, RetryConfig

# Configure retry behavior
config = RetryConfig(
    max_attempts=3,
    initial_delay=1.0,
    max_delay=60.0,
    exponential_base=2.0,
    jitter=True
)

# Wrap node with retry
retry_node = RetryNode(
    node=UnreliableAPINode(),
    config=config
)

result = await retry_node.run(input_data)

Custom Retry Logic

from nadoo_flow import BaseNode, NodeResult
import asyncio

class CustomRetryNode(BaseNode):
    """Node with custom retry logic"""

    def __init__(self, max_retries=3, retry_on=None):
        super().__init__(node_id="custom_retry")
        self.max_retries = max_retries
        self.retry_on = retry_on or [ConnectionError, TimeoutError]

    async def execute(self, node_context, workflow_context):
        last_error = None

        for attempt in range(self.max_retries):
            try:
                # Attempt operation
                result = await self._perform_operation(node_context)
                return NodeResult(success=True, output=result)

            except Exception as e:
                last_error = e

                # Check if should retry
                if not self._should_retry(e, attempt):
                    return NodeResult(
                        success=False,
                        error=f"Non-retryable error: {e}"
                    )

                # Calculate delay
                delay = self._calculate_delay(attempt)
                logger.info(f"Retry {attempt + 1}/{self.max_retries} after {delay}s")
                await asyncio.sleep(delay)

        return NodeResult(
            success=False,
            error=f"Max retries exceeded. Last error: {last_error}"
        )

    def _should_retry(self, error, attempt):
        """Determine if error is retryable"""
        if attempt >= self.max_retries - 1:
            return False

        return any(isinstance(error, exc_type) for exc_type in self.retry_on)

    def _calculate_delay(self, attempt):
        """Calculate delay with exponential backoff"""
        base_delay = 1.0
        max_delay = 60.0
        delay = min(base_delay * (2 ** attempt), max_delay)

        # Add jitter
        import random
        jitter = random.uniform(0, delay * 0.1)
        return delay + jitter

Circuit Breaker

Circuit Breaker Implementation

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"      # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing recovery

class CircuitBreaker:
    """Circuit breaker pattern implementation"""

    def __init__(
        self,
        failure_threshold=5,
        recovery_timeout=60,
        expected_exception=Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception

        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED

    async def call(self, func, *args, **kwargs):
        """Execute function through circuit breaker"""
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpenError("Circuit breaker is open")

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result

        except self.expected_exception as e:
            self._on_failure()
            raise e

    def _should_attempt_reset(self):
        """Check if enough time has passed to retry"""
        if not self.last_failure_time:
            return True

        return (
            datetime.now() - self.last_failure_time
            > timedelta(seconds=self.recovery_timeout)
        )

    def _on_success(self):
        """Handle successful call"""
        if self.state == CircuitState.HALF_OPEN:
            self.state = CircuitState.CLOSED

        self.failure_count = 0
        self.last_failure_time = None

    def _on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

Circuit Breaker Node

class CircuitBreakerNode(BaseNode):
    """Node with circuit breaker protection"""

    def __init__(self, protected_node, breaker_config=None):
        super().__init__(node_id="circuit_breaker")
        self.protected_node = protected_node
        self.circuit_breaker = CircuitBreaker(**(breaker_config or {}))

    async def execute(self, node_context, workflow_context):
        try:
            result = await self.circuit_breaker.call(
                self.protected_node.execute,
                node_context,
                workflow_context
            )
            return result

        except CircuitBreakerOpenError:
            # Circuit is open, use fallback
            return await self._fallback(node_context, workflow_context)

    async def _fallback(self, node_context, workflow_context):
        """Fallback when circuit is open"""
        return NodeResult(
            success=False,
            error="Service temporarily unavailable",
            metadata={"circuit_state": "open"}
        )

Fallback Strategies

Fallback Node

from nadoo_flow import FallbackNode

class FallbackNode(BaseNode):
    """Execute fallback on primary failure"""

    def __init__(self, primary, fallback):
        super().__init__(node_id="fallback")
        self.primary = primary
        self.fallback = fallback

    async def execute(self, node_context, workflow_context):
        # Try primary
        primary_result = await self.primary.execute(
            node_context, workflow_context
        )

        if primary_result.success:
            return primary_result

        # Log primary failure
        logger.warning(
            f"Primary failed: {primary_result.error}, using fallback"
        )

        # Execute fallback
        fallback_result = await self.fallback.execute(
            node_context, workflow_context
        )

        # Mark as fallback result
        fallback_result.metadata["fallback_used"] = True
        fallback_result.metadata["primary_error"] = primary_result.error

        return fallback_result

Multiple Fallbacks

class MultiFallbackNode(BaseNode):
    """Try multiple fallback options"""

    def __init__(self, *nodes):
        super().__init__(node_id="multi_fallback")
        self.nodes = nodes

    async def execute(self, node_context, workflow_context):
        errors = []

        for i, node in enumerate(self.nodes):
            try:
                result = await node.execute(node_context, workflow_context)

                if result.success:
                    result.metadata["fallback_level"] = i
                    return result

                errors.append(f"Node {i}: {result.error}")

            except Exception as e:
                errors.append(f"Node {i}: {str(e)}")

        # All failed
        return NodeResult(
            success=False,
            error=f"All fallbacks failed: {'; '.join(errors)}"
        )

Cache Fallback

class CacheFallbackNode(BaseNode):
    """Use cached result as fallback"""

    def __init__(self, node, cache_ttl=3600):
        super().__init__(node_id="cache_fallback")
        self.node = node
        self.cache_ttl = cache_ttl
        self.cache = {}

    async def execute(self, node_context, workflow_context):
        cache_key = self._get_cache_key(node_context.input_data)

        try:
            # Try to execute
            result = await self.node.execute(node_context, workflow_context)

            if result.success:
                # Update cache
                self.cache[cache_key] = {
                    "result": result,
                    "timestamp": time.time()
                }
                return result

        except Exception as e:
            logger.error(f"Node execution failed: {e}")

        # Check cache for fallback
        if cache_key in self.cache:
            cached = self.cache[cache_key]
            age = time.time() - cached["timestamp"]

            if age <= self.cache_ttl:
                logger.info("Using cached result as fallback")
                cached_result = cached["result"]
                cached_result.metadata["from_cache"] = True
                cached_result.metadata["cache_age"] = age
                return cached_result

        return NodeResult(
            success=False,
            error="No cached fallback available"
        )

Timeout Handling

Timeout Node

class TimeoutNode(BaseNode):
    """Execute with timeout"""

    def __init__(self, node, timeout=30):
        super().__init__(node_id="timeout")
        self.node = node
        self.timeout = timeout

    async def execute(self, node_context, workflow_context):
        try:
            result = await asyncio.wait_for(
                self.node.execute(node_context, workflow_context),
                timeout=self.timeout
            )
            return result

        except asyncio.TimeoutError:
            return NodeResult(
                success=False,
                error=f"Operation timed out after {self.timeout}s"
            )

Adaptive Timeout

class AdaptiveTimeoutNode(BaseNode):
    """Adjust timeout based on performance"""

    def __init__(self, node, initial_timeout=30):
        super().__init__(node_id="adaptive_timeout")
        self.node = node
        self.initial_timeout = initial_timeout
        self.execution_times = []
        self.timeout_multiplier = 2.0

    async def execute(self, node_context, workflow_context):
        # Calculate adaptive timeout
        timeout = self._calculate_timeout()

        try:
            start_time = time.time()

            result = await asyncio.wait_for(
                self.node.execute(node_context, workflow_context),
                timeout=timeout
            )

            # Record execution time
            execution_time = time.time() - start_time
            self.execution_times.append(execution_time)

            # Keep only recent times
            self.execution_times = self.execution_times[-10:]

            return result

        except asyncio.TimeoutError:
            return NodeResult(
                success=False,
                error=f"Timeout after {timeout}s (adaptive)"
            )

    def _calculate_timeout(self):
        """Calculate timeout based on history"""
        if not self.execution_times:
            return self.initial_timeout

        # Use 95th percentile of recent times
        import numpy as np
        p95 = np.percentile(self.execution_times, 95)
        return max(
            self.initial_timeout,
            p95 * self.timeout_multiplier
        )

Error Recovery

Self-Healing Node

class SelfHealingNode(BaseNode):
    """Node that attempts to fix errors"""

    async def execute(self, node_context, workflow_context):
        try:
            return await self._primary_execution(node_context)

        except ValidationError as e:
            # Attempt to fix validation errors
            fixed_input = await self._fix_validation(
                node_context.input_data, e
            )
            node_context.input_data = fixed_input
            return await self._primary_execution(node_context)

        except ResourceError as e:
            # Attempt to acquire resources
            await self._acquire_resources(e)
            return await self._primary_execution(node_context)

        except NetworkError:
            # Wait and retry with different endpoint
            await asyncio.sleep(5)
            return await self._execute_with_backup_endpoint(node_context)

    async def _fix_validation(self, data, error):
        """Attempt to fix validation errors"""
        # Example: Add missing required fields with defaults
        if "missing_field" in str(error):
            data["missing_field"] = self._get_default_value()
        return data

    async def _acquire_resources(self, error):
        """Try to acquire needed resources"""
        # Example: Clear cache, close idle connections
        await self._cleanup_resources()

Bulkhead Pattern

Resource Isolation

class BulkheadNode(BaseNode):
    """Isolate resources to prevent cascading failures"""

    def __init__(self, node, max_concurrent=10, queue_size=100):
        super().__init__(node_id="bulkhead")
        self.node = node
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.queue = asyncio.Queue(maxsize=queue_size)
        self.rejected_count = 0

    async def execute(self, node_context, workflow_context):
        # Check if can acquire resource
        if self.semaphore._value == 0 and self.queue.full():
            self.rejected_count += 1
            return NodeResult(
                success=False,
                error="Bulkhead full - request rejected",
                metadata={"rejected_count": self.rejected_count}
            )

        # Queue request
        await self.queue.put(node_context)

        try:
            async with self.semaphore:
                # Get from queue
                context = await self.queue.get()

                # Execute with resource
                result = await self.node.execute(
                    context, workflow_context
                )

                return result

        finally:
            self.queue.task_done()

Health Checks

Health Check Node

class HealthCheckNode(BaseNode):
    """Periodic health checks"""

    def __init__(self, node, health_check_interval=60):
        super().__init__(node_id="health_check")
        self.node = node
        self.health_check_interval = health_check_interval
        self.is_healthy = True
        self.last_check = None
        self._start_health_check()

    def _start_health_check(self):
        """Start background health check"""
        asyncio.create_task(self._health_check_loop())

    async def _health_check_loop(self):
        """Periodic health check"""
        while True:
            await asyncio.sleep(self.health_check_interval)
            self.is_healthy = await self._check_health()
            self.last_check = datetime.now()

    async def _check_health(self):
        """Perform health check"""
        try:
            # Simple ping or test query
            test_context = NodeContext("health_check")
            test_context.input_data = {"test": True}

            result = await asyncio.wait_for(
                self.node.execute(test_context, WorkflowContext("test")),
                timeout=5
            )

            return result.success

        except Exception:
            return False

    async def execute(self, node_context, workflow_context):
        if not self.is_healthy:
            return NodeResult(
                success=False,
                error="Node unhealthy",
                metadata={
                    "last_healthy": self.last_check,
                    "health_status": "unhealthy"
                }
            )

        return await self.node.execute(node_context, workflow_context)

Resilience Composition

Combining Patterns

def create_resilient_node(node):
    """Apply multiple resilience patterns"""

    # Add retry
    node = RetryNode(
        node=node,
        config=RetryConfig(max_attempts=3)
    )

    # Add circuit breaker
    node = CircuitBreakerNode(
        protected_node=node,
        breaker_config={"failure_threshold": 5}
    )

    # Add timeout
    node = TimeoutNode(
        node=node,
        timeout=30
    )

    # Add fallback
    fallback = CachedResultNode()
    node = FallbackNode(
        primary=node,
        fallback=fallback
    )

    # Add bulkhead
    node = BulkheadNode(
        node=node,
        max_concurrent=20
    )

    return node

# Usage
resilient_node = create_resilient_node(APINode())
result = await resilient_node.run(input_data)

Best Practices

Select resilience patterns based on failure modes:
  • Transient failures: Retry
  • Cascading failures: Circuit Breaker
  • Resource exhaustion: Bulkhead
  • Slow responses: Timeout
Track resilience metrics:
metrics = {
    "retry_count": retry_node.retry_count,
    "circuit_state": breaker.state,
    "fallback_usage": fallback_count
}
Regularly test resilience patterns with chaos engineering.
Clearly document how each node handles and recovers from failures.
Configure timeouts, retries, and circuit breakers with realistic values.

Next Steps