Skip to main content

Overview

The chain module provides a fluent API for composing nodes using the pipe operator (|).

Classes

ChainableNode

Base class for nodes that support chaining.
from nadoo_flow import ChainableNode

class MyNode(ChainableNode):
    async def execute(self, node_context, workflow_context):
        # Implementation
        pass

Methods

or
Enables the pipe operator for chaining.
def __or__(self, other: ChainableNode) -> NodeChain
Parameters:
  • other: Another chainable node
Returns:
  • NodeChain: A chain containing both nodes
Example:
chain = NodeA() | NodeB() | NodeC()
run
Execute the node with input data.
async def run(input_data: Dict[str, Any]) -> Dict[str, Any]
Parameters:
  • input_data: Input data dictionary
Returns:
  • Output data dictionary
stream
Stream execution events.
async def stream(input_data: Dict[str, Any]) -> AsyncIterator[StreamEvent]

NodeChain

Container for chained nodes.
from nadoo_flow import NodeChain

chain = NodeChain([node1, node2, node3])

Parameters

NameTypeRequiredDescription
nodesList[ChainableNode]YesList of nodes to chain

Methods

add_node
Add a node to the chain.
def add_node(node: ChainableNode) -> None
execute
Execute all nodes in sequence.
async def execute(
    node_context: NodeContext,
    workflow_context: WorkflowContext
) -> NodeResult
run
Run the chain with input data.
async def run(input_data: Dict[str, Any]) -> Dict[str, Any]
Example:
chain = NodeChain([ProcessNode(), OutputNode()])
result = await chain.run({"data": "input"})

FunctionNode

Wrap a function as a chainable node.
from nadoo_flow import FunctionNode

def process_data(data):
    return {"result": data["value"] * 2}

node = FunctionNode(process_data)

Parameters

NameTypeRequiredDescription
funcCallableYesFunction to wrap
node_idstrNoNode identifier
namestrNoNode name

Supported Function Types

Synchronous Functions
def sync_func(data: Dict) -> Dict:
    return {"processed": data}

node = FunctionNode(sync_func)
Asynchronous Functions
async def async_func(data: Dict) -> Dict:
    await asyncio.sleep(1)
    return {"processed": data}

node = FunctionNode(async_func)
Lambda Functions
node = FunctionNode(lambda x: {"doubled": x["value"] * 2})

PassthroughNode

Identity node that passes data through unchanged.
from nadoo_flow import PassthroughNode

node = PassthroughNode(node_id="passthrough")

Use Cases

  • Debugging chains
  • Placeholder nodes
  • Data inspection points
Example:
chain = (
    InputNode()
    | PassthroughNode()  # Inspect data here
    | ProcessNode()
    | PassthroughNode()  # And here
    | OutputNode()
)

TransformNode

Generic transformation node.
from nadoo_flow import TransformNode

node = TransformNode(
    transform_fn=lambda x: {"transformed": x},
    node_id="transformer"
)

Parameters

NameTypeRequiredDescription
transform_fnCallableYesTransformation function
node_idstrNoNode identifier
validate_fnCallableNoValidation function

Utility Functions

create_chain

Create a chain from a list of nodes.
from nadoo_flow import create_chain

chain = create_chain([
    NodeA(),
    NodeB(),
    NodeC()
])

chain_from_config

Create a chain from configuration.
from nadoo_flow import chain_from_config

config = [
    {"type": "input", "id": "input_1"},
    {"type": "process", "id": "process_1"},
    {"type": "output", "id": "output_1"}
]

chain = chain_from_config(config)

Composition Patterns

Simple Chain

from nadoo_flow import ChainableNode, NodeResult

class UpperNode(ChainableNode):
    async def execute(self, node_context, workflow_context):
        text = node_context.input_data.get("text", "")
        return NodeResult(
            success=True,
            output={"text": text.upper()}
        )

class ReverseNode(ChainableNode):
    async def execute(self, node_context, workflow_context):
        text = node_context.input_data.get("text", "")
        return NodeResult(
            success=True,
            output={"text": text[::-1]}
        )

# Create chain
chain = UpperNode() | ReverseNode()

# Execute
result = await chain.run({"text": "hello"})
print(result)  # {"text": "OLLEH"}

Function Chain

from nadoo_flow import FunctionNode

# Define functions
def add_prefix(data):
    return {"text": f"PREFIX_{data['text']}"}

def add_suffix(data):
    return {"text": f"{data['text']}_SUFFIX"}

def to_upper(data):
    return {"text": data["text"].upper()}

# Create chain from functions
chain = (
    FunctionNode(add_prefix)
    | FunctionNode(to_upper)
    | FunctionNode(add_suffix)
)

# Execute
result = await chain.run({"text": "hello"})
print(result)  # {"text": "PREFIX_HELLO_SUFFIX"}

Mixed Chain

# Mix different node types
chain = (
    InputValidationNode()
    | FunctionNode(lambda x: {"data": x["data"] * 2})
    | ProcessingNode()
    | PassthroughNode()  # For debugging
    | OutputFormatterNode()
)

Conditional Chain

class ConditionalChain(ChainableNode):
    def __init__(self, condition_fn, true_chain, false_chain):
        super().__init__()
        self.condition_fn = condition_fn
        self.true_chain = true_chain
        self.false_chain = false_chain

    async def execute(self, node_context, workflow_context):
        if self.condition_fn(node_context.input_data):
            return await self.true_chain.execute(
                node_context, workflow_context
            )
        else:
            return await self.false_chain.execute(
                node_context, workflow_context
            )

# Usage
chain = ConditionalChain(
    condition_fn=lambda x: x.get("premium", False),
    true_chain=PremiumProcessor() | PremiumOutput(),
    false_chain=StandardProcessor() | StandardOutput()
)

Dynamic Chain Building

def build_chain(steps):
    """Build chain dynamically from steps"""
    chain = None

    for step in steps:
        if step["type"] == "transform":
            node = FunctionNode(step["function"])
        elif step["type"] == "validate":
            node = ValidationNode(step["schema"])
        else:
            node = CustomNode(step["config"])

        if chain is None:
            chain = node
        else:
            chain = chain | node

    return chain

# Usage
steps = [
    {"type": "transform", "function": clean_data},
    {"type": "validate", "schema": data_schema},
    {"type": "custom", "config": {"param": "value"}}
]

chain = build_chain(steps)

Advanced Features

Chain Inspection

from nadoo_flow import NodeChain

chain = NodeA() | NodeB() | NodeC()

# Inspect chain
print(f"Chain length: {len(chain.nodes)}")
print(f"Node IDs: {[n.node_id for n in chain.nodes]}")

# Modify chain
chain.nodes.insert(1, DebugNode())  # Insert debug node

Chain Cloning

import copy

original_chain = NodeA() | NodeB()
cloned_chain = copy.deepcopy(original_chain)

# Modify cloned chain without affecting original
cloned_chain.nodes.append(NodeC())

Chain Composition

# Create sub-chains
validation_chain = SchemaValidator() | DataCleaner()
processing_chain = Transformer() | Aggregator()
output_chain = Formatter() | Writer()

# Compose into larger chain
full_chain = validation_chain | processing_chain | output_chain

Error Handling

Chain Error Propagation

class ErrorHandlingChain(NodeChain):
    async def execute(self, node_context, workflow_context):
        for i, node in enumerate(self.nodes):
            try:
                result = await node.execute(node_context, workflow_context)

                if not result.success:
                    # Stop chain on failure
                    return result

                # Pass output to next node
                node_context.input_data = result.output

            except Exception as e:
                return NodeResult(
                    success=False,
                    error=f"Chain failed at node {i}: {e}"
                )

        return NodeResult(
            success=True,
            output=node_context.input_data
        )

Performance Optimization

Lazy Chain Evaluation

class LazyChain(NodeChain):
    """Evaluate nodes only when needed"""

    async def execute(self, node_context, workflow_context):
        for node in self.nodes:
            # Check if should continue
            if self._should_stop(node_context):
                break

            result = await node.execute(node_context, workflow_context)
            node_context.input_data = result.output

        return NodeResult(
            success=True,
            output=node_context.input_data
        )

    def _should_stop(self, context):
        # Custom stop condition
        return context.input_data.get("stop", False)

Best Practices

Each chain should have a single, clear purpose. Complex workflows should be broken into sub-chains.
from typing import Dict, Any

def transform(data: Dict[str, Any]) -> Dict[str, Any]:
    return {"result": data}

node = FunctionNode(transform)
Always handle potential errors in chain execution to prevent cascading failures.
Test each chain component separately before testing the full chain.

See Also