Overview
Flow Core provides comprehensive resilience patterns to handle failures gracefully, including retry logic, circuit breakers, fallback strategies, and error recovery mechanisms.Retry Patterns
Basic Retry
Copy
from nadoo_flow import RetryNode, RetryConfig
# Configure retry behavior
config = RetryConfig(
max_attempts=3,
initial_delay=1.0,
max_delay=60.0,
exponential_base=2.0,
jitter=True
)
# Wrap node with retry
retry_node = RetryNode(
node=UnreliableAPINode(),
config=config
)
result = await retry_node.run(input_data)
Custom Retry Logic
Copy
from nadoo_flow import BaseNode, NodeResult
import asyncio
class CustomRetryNode(BaseNode):
"""Node with custom retry logic"""
def __init__(self, max_retries=3, retry_on=None):
super().__init__(node_id="custom_retry")
self.max_retries = max_retries
self.retry_on = retry_on or [ConnectionError, TimeoutError]
async def execute(self, node_context, workflow_context):
last_error = None
for attempt in range(self.max_retries):
try:
# Attempt operation
result = await self._perform_operation(node_context)
return NodeResult(success=True, output=result)
except Exception as e:
last_error = e
# Check if should retry
if not self._should_retry(e, attempt):
return NodeResult(
success=False,
error=f"Non-retryable error: {e}"
)
# Calculate delay
delay = self._calculate_delay(attempt)
logger.info(f"Retry {attempt + 1}/{self.max_retries} after {delay}s")
await asyncio.sleep(delay)
return NodeResult(
success=False,
error=f"Max retries exceeded. Last error: {last_error}"
)
def _should_retry(self, error, attempt):
"""Determine if error is retryable"""
if attempt >= self.max_retries - 1:
return False
return any(isinstance(error, exc_type) for exc_type in self.retry_on)
def _calculate_delay(self, attempt):
"""Calculate delay with exponential backoff"""
base_delay = 1.0
max_delay = 60.0
delay = min(base_delay * (2 ** attempt), max_delay)
# Add jitter
import random
jitter = random.uniform(0, delay * 0.1)
return delay + jitter
Circuit Breaker
Circuit Breaker Implementation
Copy
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing recovery
class CircuitBreaker:
"""Circuit breaker pattern implementation"""
def __init__(
self,
failure_threshold=5,
recovery_timeout=60,
expected_exception=Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func, *args, **kwargs):
"""Execute function through circuit breaker"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self):
"""Check if enough time has passed to retry"""
if not self.last_failure_time:
return True
return (
datetime.now() - self.last_failure_time
> timedelta(seconds=self.recovery_timeout)
)
def _on_success(self):
"""Handle successful call"""
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.last_failure_time = None
def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Circuit Breaker Node
Copy
class CircuitBreakerNode(BaseNode):
"""Node with circuit breaker protection"""
def __init__(self, protected_node, breaker_config=None):
super().__init__(node_id="circuit_breaker")
self.protected_node = protected_node
self.circuit_breaker = CircuitBreaker(**(breaker_config or {}))
async def execute(self, node_context, workflow_context):
try:
result = await self.circuit_breaker.call(
self.protected_node.execute,
node_context,
workflow_context
)
return result
except CircuitBreakerOpenError:
# Circuit is open, use fallback
return await self._fallback(node_context, workflow_context)
async def _fallback(self, node_context, workflow_context):
"""Fallback when circuit is open"""
return NodeResult(
success=False,
error="Service temporarily unavailable",
metadata={"circuit_state": "open"}
)
Fallback Strategies
Fallback Node
Copy
from nadoo_flow import FallbackNode
class FallbackNode(BaseNode):
"""Execute fallback on primary failure"""
def __init__(self, primary, fallback):
super().__init__(node_id="fallback")
self.primary = primary
self.fallback = fallback
async def execute(self, node_context, workflow_context):
# Try primary
primary_result = await self.primary.execute(
node_context, workflow_context
)
if primary_result.success:
return primary_result
# Log primary failure
logger.warning(
f"Primary failed: {primary_result.error}, using fallback"
)
# Execute fallback
fallback_result = await self.fallback.execute(
node_context, workflow_context
)
# Mark as fallback result
fallback_result.metadata["fallback_used"] = True
fallback_result.metadata["primary_error"] = primary_result.error
return fallback_result
Multiple Fallbacks
Copy
class MultiFallbackNode(BaseNode):
"""Try multiple fallback options"""
def __init__(self, *nodes):
super().__init__(node_id="multi_fallback")
self.nodes = nodes
async def execute(self, node_context, workflow_context):
errors = []
for i, node in enumerate(self.nodes):
try:
result = await node.execute(node_context, workflow_context)
if result.success:
result.metadata["fallback_level"] = i
return result
errors.append(f"Node {i}: {result.error}")
except Exception as e:
errors.append(f"Node {i}: {str(e)}")
# All failed
return NodeResult(
success=False,
error=f"All fallbacks failed: {'; '.join(errors)}"
)
Cache Fallback
Copy
class CacheFallbackNode(BaseNode):
"""Use cached result as fallback"""
def __init__(self, node, cache_ttl=3600):
super().__init__(node_id="cache_fallback")
self.node = node
self.cache_ttl = cache_ttl
self.cache = {}
async def execute(self, node_context, workflow_context):
cache_key = self._get_cache_key(node_context.input_data)
try:
# Try to execute
result = await self.node.execute(node_context, workflow_context)
if result.success:
# Update cache
self.cache[cache_key] = {
"result": result,
"timestamp": time.time()
}
return result
except Exception as e:
logger.error(f"Node execution failed: {e}")
# Check cache for fallback
if cache_key in self.cache:
cached = self.cache[cache_key]
age = time.time() - cached["timestamp"]
if age <= self.cache_ttl:
logger.info("Using cached result as fallback")
cached_result = cached["result"]
cached_result.metadata["from_cache"] = True
cached_result.metadata["cache_age"] = age
return cached_result
return NodeResult(
success=False,
error="No cached fallback available"
)
Timeout Handling
Timeout Node
Copy
class TimeoutNode(BaseNode):
"""Execute with timeout"""
def __init__(self, node, timeout=30):
super().__init__(node_id="timeout")
self.node = node
self.timeout = timeout
async def execute(self, node_context, workflow_context):
try:
result = await asyncio.wait_for(
self.node.execute(node_context, workflow_context),
timeout=self.timeout
)
return result
except asyncio.TimeoutError:
return NodeResult(
success=False,
error=f"Operation timed out after {self.timeout}s"
)
Adaptive Timeout
Copy
class AdaptiveTimeoutNode(BaseNode):
"""Adjust timeout based on performance"""
def __init__(self, node, initial_timeout=30):
super().__init__(node_id="adaptive_timeout")
self.node = node
self.initial_timeout = initial_timeout
self.execution_times = []
self.timeout_multiplier = 2.0
async def execute(self, node_context, workflow_context):
# Calculate adaptive timeout
timeout = self._calculate_timeout()
try:
start_time = time.time()
result = await asyncio.wait_for(
self.node.execute(node_context, workflow_context),
timeout=timeout
)
# Record execution time
execution_time = time.time() - start_time
self.execution_times.append(execution_time)
# Keep only recent times
self.execution_times = self.execution_times[-10:]
return result
except asyncio.TimeoutError:
return NodeResult(
success=False,
error=f"Timeout after {timeout}s (adaptive)"
)
def _calculate_timeout(self):
"""Calculate timeout based on history"""
if not self.execution_times:
return self.initial_timeout
# Use 95th percentile of recent times
import numpy as np
p95 = np.percentile(self.execution_times, 95)
return max(
self.initial_timeout,
p95 * self.timeout_multiplier
)
Error Recovery
Self-Healing Node
Copy
class SelfHealingNode(BaseNode):
"""Node that attempts to fix errors"""
async def execute(self, node_context, workflow_context):
try:
return await self._primary_execution(node_context)
except ValidationError as e:
# Attempt to fix validation errors
fixed_input = await self._fix_validation(
node_context.input_data, e
)
node_context.input_data = fixed_input
return await self._primary_execution(node_context)
except ResourceError as e:
# Attempt to acquire resources
await self._acquire_resources(e)
return await self._primary_execution(node_context)
except NetworkError:
# Wait and retry with different endpoint
await asyncio.sleep(5)
return await self._execute_with_backup_endpoint(node_context)
async def _fix_validation(self, data, error):
"""Attempt to fix validation errors"""
# Example: Add missing required fields with defaults
if "missing_field" in str(error):
data["missing_field"] = self._get_default_value()
return data
async def _acquire_resources(self, error):
"""Try to acquire needed resources"""
# Example: Clear cache, close idle connections
await self._cleanup_resources()
Bulkhead Pattern
Resource Isolation
Copy
class BulkheadNode(BaseNode):
"""Isolate resources to prevent cascading failures"""
def __init__(self, node, max_concurrent=10, queue_size=100):
super().__init__(node_id="bulkhead")
self.node = node
self.semaphore = asyncio.Semaphore(max_concurrent)
self.queue = asyncio.Queue(maxsize=queue_size)
self.rejected_count = 0
async def execute(self, node_context, workflow_context):
# Check if can acquire resource
if self.semaphore._value == 0 and self.queue.full():
self.rejected_count += 1
return NodeResult(
success=False,
error="Bulkhead full - request rejected",
metadata={"rejected_count": self.rejected_count}
)
# Queue request
await self.queue.put(node_context)
try:
async with self.semaphore:
# Get from queue
context = await self.queue.get()
# Execute with resource
result = await self.node.execute(
context, workflow_context
)
return result
finally:
self.queue.task_done()
Health Checks
Health Check Node
Copy
class HealthCheckNode(BaseNode):
"""Periodic health checks"""
def __init__(self, node, health_check_interval=60):
super().__init__(node_id="health_check")
self.node = node
self.health_check_interval = health_check_interval
self.is_healthy = True
self.last_check = None
self._start_health_check()
def _start_health_check(self):
"""Start background health check"""
asyncio.create_task(self._health_check_loop())
async def _health_check_loop(self):
"""Periodic health check"""
while True:
await asyncio.sleep(self.health_check_interval)
self.is_healthy = await self._check_health()
self.last_check = datetime.now()
async def _check_health(self):
"""Perform health check"""
try:
# Simple ping or test query
test_context = NodeContext("health_check")
test_context.input_data = {"test": True}
result = await asyncio.wait_for(
self.node.execute(test_context, WorkflowContext("test")),
timeout=5
)
return result.success
except Exception:
return False
async def execute(self, node_context, workflow_context):
if not self.is_healthy:
return NodeResult(
success=False,
error="Node unhealthy",
metadata={
"last_healthy": self.last_check,
"health_status": "unhealthy"
}
)
return await self.node.execute(node_context, workflow_context)
Resilience Composition
Combining Patterns
Copy
def create_resilient_node(node):
"""Apply multiple resilience patterns"""
# Add retry
node = RetryNode(
node=node,
config=RetryConfig(max_attempts=3)
)
# Add circuit breaker
node = CircuitBreakerNode(
protected_node=node,
breaker_config={"failure_threshold": 5}
)
# Add timeout
node = TimeoutNode(
node=node,
timeout=30
)
# Add fallback
fallback = CachedResultNode()
node = FallbackNode(
primary=node,
fallback=fallback
)
# Add bulkhead
node = BulkheadNode(
node=node,
max_concurrent=20
)
return node
# Usage
resilient_node = create_resilient_node(APINode())
result = await resilient_node.run(input_data)
Best Practices
Choose Appropriate Patterns
Choose Appropriate Patterns
Select resilience patterns based on failure modes:
- Transient failures: Retry
- Cascading failures: Circuit Breaker
- Resource exhaustion: Bulkhead
- Slow responses: Timeout
Monitor and Alert
Monitor and Alert
Track resilience metrics:
Copy
metrics = {
"retry_count": retry_node.retry_count,
"circuit_state": breaker.state,
"fallback_usage": fallback_count
}
Test Failure Scenarios
Test Failure Scenarios
Regularly test resilience patterns with chaos engineering.
Document Recovery Behavior
Document Recovery Behavior
Clearly document how each node handles and recovers from failures.
Set Reasonable Limits
Set Reasonable Limits
Configure timeouts, retries, and circuit breakers with realistic values.