Skip to main content

Overview

The resilience module provides retry and fallback mechanisms for building fault-tolerant workflows. It includes exponential backoff with jitter for retries and sequential fallback patterns for graceful degradation.

Classes

RetryPolicy

Configuration for retry behavior.
from nadoo_flow import RetryPolicy

policy = RetryPolicy(
    max_attempts=5,
    initial_delay=1.0,
    max_delay=60.0,
    exponential_base=2.0,
    jitter=1.0,
    retry_on_exceptions=(TimeoutError, ConnectionError)
)

Parameters

NameTypeDefaultDescription
max_attemptsint3Maximum number of retry attempts
initial_delayfloat1.0Initial delay in seconds before first retry
max_delayfloat60.0Maximum delay in seconds (caps exponential backoff)
exponential_basefloat2.0Base for exponential backoff (2.0 = double each time)
jitterfloat1.0Random jitter range (0 to jitter seconds) added to delay
retry_on_exceptionstuple[Type[Exception], ...](Exception,)Exception types to retry on
retry_on_statustuple[str, ...]()Node status codes to retry on

Delay Calculation

The delay between retries is calculated as:
delay = min(initial_delay * (exponential_base ^ attempt), max_delay) + random(0, jitter)
Example delays with default settings:
  • Attempt 1: 1.0s + jitter
  • Attempt 2: 2.0s + jitter
  • Attempt 3: 4.0s + jitter
  • Attempt 4: 8.0s + jitter
  • Attempt 5: 16.0s + jitter

RetryableNode

Base class for nodes with automatic retry logic.
from nadoo_flow import RetryableNode, RetryPolicy, NodeResult

class MyLLMNode(RetryableNode):
    def __init__(self):
        super().__init__(
            node_id="llm",
            node_type="llm",
            name="LLM Node",
            config={},
            retry_policy=RetryPolicy(
                max_attempts=5,
                retry_on_exceptions=(TimeoutError, ConnectionError)
            )
        )

    async def _execute_with_retry(self, node_context, workflow_context):
        # Your node logic here (without retry logic)
        response = await call_llm_api()
        return NodeResult(success=True, output={"response": response})

Constructor Parameters

NameTypeRequiredDescription
node_idstrYesUnique node identifier
node_typestrYesNode type identifier
namestrYesHuman-readable node name
configdict[str, Any]YesNode configuration
retry_policyRetryPolicy | NoneNoRetry policy (defaults to RetryPolicy())

Methods

execute
Main execution method with retry logic (do not override).
async def execute(
    node_context: NodeContext,
    workflow_context: WorkflowContext
) -> NodeResult
This method handles all retry logic automatically. It calls _execute_with_retry() internally.
_execute_with_retry
Override this method with your node logic (without retry handling).
async def _execute_with_retry(
    node_context: NodeContext,
    workflow_context: WorkflowContext
) -> NodeResult
Returns:
  • NodeResult - Your node’s execution result
Note: You should implement pure business logic here. The retry mechanism is handled by the parent class.

Retry Metadata

When retries occur, the returned NodeResult includes metadata:
{
    "retry_info": {
        "total_attempts": 3,
        "retry_delays": [1.2, 2.4],
        "errors": ["Timeout", "Connection refused"]
    }
}

FallbackNode

Node that tries multiple fallback options sequentially.
from nadoo_flow import FallbackNode

fallback = FallbackNode(
    node_id="llm_fallback",
    nodes=[
        GPT4Node(),      # Try first
        ClaudeNode(),    # Try second if first fails
        LlamaNode()      # Try third if both fail
    ],
    handle_exceptions=(TimeoutError, ConnectionError),
    pass_error_context=True
)

result = await fallback.execute(node_context, workflow_context)

Constructor Parameters

NameTypeRequiredDefaultDescription
node_idstrYes-Unique node identifier
nodeslist[BaseNode]Yes-List of nodes to try (in order)
handle_exceptionstuple[Type[Exception], ...]No(Exception,)Exceptions to handle with fallback
pass_error_contextboolNoTruePass previous errors to next node
namestrNo"Fallback"Node name
configdict[str, Any] | NoneNoNoneNode configuration

Methods

execute
Execute nodes sequentially until one succeeds.
async def execute(
    node_context: NodeContext,
    workflow_context: WorkflowContext
) -> NodeResult
Behavior:
  1. Try first node
  2. If successful, return result
  3. If failed, try next node
  4. Repeat until success or all nodes exhausted
  5. Return last error if all fail
Returns:
  • NodeResult - Result from first successful node, or error if all fail

Fallback Metadata

The returned NodeResult includes fallback information:
{
    "fallback_info": {
        "attempted_nodes": [
            {"node_id": "gpt4", "node_type": "llm", "attempt_index": 0},
            {"node_id": "claude", "node_type": "llm", "attempt_index": 1}
        ],
        "successful_node": "claude",
        "fallback_index": 1,
        "total_attempts": 2,
        "errors": ["Timeout error"]
    }
}

Usage Patterns

Basic Retry

from nadoo_flow import RetryableNode, RetryPolicy, NodeResult

class APICallNode(RetryableNode):
    def __init__(self):
        super().__init__(
            node_id="api_call",
            node_type="api",
            name="External API Call",
            config={"url": "https://api.example.com"},
            retry_policy=RetryPolicy(max_attempts=3)
        )

    async def _execute_with_retry(self, node_context, workflow_context):
        import httpx

        async with httpx.AsyncClient() as client:
            response = await client.get(self.config["url"])
            response.raise_for_status()

        return NodeResult(
            success=True,
            output={"data": response.json()}
        )

# Use in workflow
node = APICallNode()
result = await node.execute(node_context, workflow_context)

Custom Retry Exceptions

from nadoo_flow import RetryPolicy

# Only retry on specific exceptions
policy = RetryPolicy(
    max_attempts=5,
    retry_on_exceptions=(TimeoutError, ConnectionError, httpx.HTTPStatusError)
)

class MyNode(RetryableNode):
    def __init__(self):
        super().__init__(
            node_id="my_node",
            node_type="api",
            name="My Node",
            config={},
            retry_policy=policy
        )

Retry on Status Codes

policy = RetryPolicy(
    max_attempts=3,
    retry_on_status=("rate_limit", "timeout", "server_error")
)

class MyNode(RetryableNode):
    async def _execute_with_retry(self, node_context, workflow_context):
        try:
            result = await api_call()
            return NodeResult(success=True, output=result)
        except RateLimitError:
            return NodeResult(
                success=False,
                error="Rate limited",
                metadata={"status_code": "rate_limit"}
            )

Multi-Model Fallback

from nadoo_flow import FallbackNode

class GPT4Node(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Call GPT-4
        return NodeResult(success=True, output={"model": "gpt-4", "text": "..."})

class ClaudeNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Call Claude
        return NodeResult(success=True, output={"model": "claude", "text": "..."})

class LocalLLMNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Call local model
        return NodeResult(success=True, output={"model": "llama", "text": "..."})

# Create fallback chain
fallback = FallbackNode(
    node_id="llm_with_fallback",
    nodes=[
        GPT4Node(),      # Best quality, try first
        ClaudeNode(),    # Good quality, try second
        LocalLLMNode()   # Always available, try last
    ]
)

# Use in workflow
result = await fallback.execute(node_context, workflow_context)
print(f"Used model: {result.output['model']}")

Combining Retry and Fallback

from nadoo_flow import RetryableNode, FallbackNode, RetryPolicy

class RetryableGPT4(RetryableNode):
    def __init__(self):
        super().__init__(
            node_id="gpt4_retry",
            node_type="llm",
            name="GPT-4 with Retry",
            config={},
            retry_policy=RetryPolicy(max_attempts=3)
        )

    async def _execute_with_retry(self, node_context, workflow_context):
        # GPT-4 logic
        return NodeResult(success=True, output={})

class RetryableClaude(RetryableNode):
    def __init__(self):
        super().__init__(
            node_id="claude_retry",
            node_type="llm",
            name="Claude with Retry",
            config={},
            retry_policy=RetryPolicy(max_attempts=3)
        )

    async def _execute_with_retry(self, node_context, workflow_context):
        # Claude logic
        return NodeResult(success=True, output={})

# Each model retries 3 times, then falls back to next
resilient_llm = FallbackNode(
    node_id="resilient_llm",
    nodes=[
        RetryableGPT4(),
        RetryableClaude(),
        LocalLLMNode()
    ]
)

Error Context Propagation

fallback = FallbackNode(
    node_id="smart_fallback",
    nodes=[PrimaryNode(), SecondaryNode()],
    pass_error_context=True
)

class SecondaryNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Access previous errors
        previous_errors = node_context.metadata.get("previous_errors", [])

        if previous_errors:
            logger.info(f"Primary failed with: {previous_errors[0]['error']}")
            # Adjust strategy based on previous failures

        return NodeResult(success=True, output={})

Error Handling

Retry Exhaustion

When all retry attempts fail:
result = await retryable_node.execute(node_context, workflow_context)

if not result.success:
    retry_info = result.metadata.get("retry_info", {})
    print(f"Failed after {retry_info['total_attempts']} attempts")
    print(f"Errors: {retry_info['errors']}")

All Fallbacks Failed

When all fallback nodes fail:
result = await fallback_node.execute(node_context, workflow_context)

if not result.success:
    fallback_info = result.metadata.get("fallback_info", {})
    all_errors = result.metadata.get("all_errors", [])

    print(f"Tried {len(fallback_info['attempted_nodes'])} nodes")
    for error in all_errors:
        print(f"  - {error['node_id']}: {error['error']}")

Non-Retryable Exceptions

Exceptions not in retry_on_exceptions cause immediate failure:
policy = RetryPolicy(
    retry_on_exceptions=(TimeoutError,)  # Only retry timeouts
)

class MyNode(RetryableNode):
    async def _execute_with_retry(self, node_context, workflow_context):
        # ValueError will NOT be retried - immediate failure
        raise ValueError("Invalid input")

Best Practices

Don’t retry indefinitely. Set reasonable limits:
# Good: Reasonable limits
RetryPolicy(max_attempts=3)  # For transient errors
RetryPolicy(max_attempts=5)  # For flaky APIs

# Bad: Too many retries
RetryPolicy(max_attempts=100)  # Will delay failures too long
Jitter prevents all instances from retrying simultaneously:
RetryPolicy(
    initial_delay=1.0,
    jitter=1.0  # Adds 0-1s random delay
)
Only retry exceptions that are transient:
# Good: Only transient errors
RetryPolicy(
    retry_on_exceptions=(TimeoutError, ConnectionError, HTTPStatusError)
)

# Bad: Retrying logic errors
RetryPolicy(
    retry_on_exceptions=(Exception,)  # Includes ValueError, etc
)
Put best options first, cheapest/most reliable last:
FallbackNode(
    nodes=[
        PremiumAPINode(),      # Best quality
        StandardAPINode(),     # Good quality
        LocalCachedNode()      # Always works
    ]
)
Use both retry (for transient issues) and fallback (for persistent failures):
# Each option retries, then falls back
FallbackNode(nodes=[
    RetryableNode(...),  # Retries 3x
    RetryableNode(...),  # Then tries this, retries 3x
    AlwaysWorksNode()    # Final fallback
])
Track retry patterns to optimize policies:
result = await node.execute(context)
if "retry_info" in result.metadata:
    attempts = result.metadata["retry_info"]["total_attempts"]
    if attempts > 1:
        logger.warning(f"Node required {attempts} attempts")

See Also