Skip to main content

Overview

Node chaining is the fundamental pattern in Flow Core for building workflows. The pipe operator (|) allows you to connect nodes sequentially, passing data from one node to the next.

Basic Chaining

Simple Sequential Chain

from nadoo_flow import FunctionNode, LLMNode

# Create a simple chain
workflow = (
    FunctionNode(lambda x: {"text": x["input"].upper()})
    | LLMNode(model="gpt-3.5-turbo")
    | FunctionNode(lambda x: {"formatted": f"Result: {x['content']}"})
)

# Execute the chain
result = await workflow.execute({"input": "hello world"})
print(result["formatted"])  # Result: [LLM response]

Multi-Step Processing

from nadoo_flow import ChainableNode

class DataProcessor(ChainableNode):
    """Custom node for data processing"""

    async def execute(self, data):
        # Process the data
        processed = data["value"] * 2
        return {"processed": processed}

class Validator(ChainableNode):
    """Validate processed data"""

    async def execute(self, data):
        if data["processed"] < 0:
            raise ValueError("Invalid data")
        return {"valid": True, "data": data["processed"]}

# Chain multiple custom nodes
pipeline = (
    DataProcessor()
    | Validator()
    | FunctionNode(lambda x: {"result": x["data"] ** 2})
)

result = await pipeline.execute({"value": 5})
print(result)  # {"result": 100}

Advanced Chaining Patterns

Conditional Chaining

from nadoo_flow import ConditionalNode, ChainableNode

class Router(ConditionalNode):
    """Route based on conditions"""

    def get_condition(self, data):
        if data["type"] == "text":
            return "text_path"
        elif data["type"] == "image":
            return "image_path"
        else:
            return "default_path"

# Define different processing paths
text_processor = LLMNode(model="gpt-4")
image_processor = FunctionNode(lambda x: {"description": "Image processed"})
default_processor = FunctionNode(lambda x: {"error": "Unknown type"})

# Create conditional chain
router = Router()
router.add_path("text_path", text_processor)
router.add_path("image_path", image_processor)
router.add_path("default_path", default_processor)

# Execute with different inputs
text_result = await router.execute({"type": "text", "content": "Hello"})
image_result = await router.execute({"type": "image", "url": "image.jpg"})

Parallel Chains with Merge

from nadoo_flow import ParallelNode, MergeNode

# Create parallel processing chains
summarizer = LLMNode(
    model="gpt-3.5-turbo",
    system_prompt="Summarize the text"
)

sentiment_analyzer = LLMNode(
    model="gpt-3.5-turbo",
    system_prompt="Analyze sentiment: positive, negative, or neutral"
)

keyword_extractor = FunctionNode(
    lambda x: {"keywords": x["text"].split()[:5]}
)

# Combine parallel chains
workflow = (
    FunctionNode(lambda x: {"text": x["input"]})
    | ParallelNode([
        summarizer,
        sentiment_analyzer,
        keyword_extractor
    ])
    | MergeNode(merge_strategy="combine")
    | FunctionNode(lambda x: {
        "analysis": {
            "summary": x[0]["content"],
            "sentiment": x[1]["content"],
            "keywords": x[2]["keywords"]
        }
    })
)

result = await workflow.execute({
    "input": "This is a great product! I love using it every day."
})

Real-World Example: Document Processing Pipeline

from nadoo_flow import (
    ChainableNode,
    LLMNode,
    FunctionNode,
    ConditionalNode,
    ErrorHandlerNode
)
import hashlib
import json

class DocumentLoader(ChainableNode):
    """Load and validate document"""

    async def execute(self, data):
        try:
            with open(data["file_path"], 'r') as f:
                content = f.read()

            return {
                "content": content,
                "length": len(content),
                "hash": hashlib.md5(content.encode()).hexdigest()
            }
        except FileNotFoundError:
            return {"error": "File not found", "content": ""}

class TextChunker(ChainableNode):
    """Split text into chunks"""

    def __init__(self, chunk_size=1000, overlap=100):
        super().__init__()
        self.chunk_size = chunk_size
        self.overlap = overlap

    async def execute(self, data):
        text = data["content"]
        chunks = []

        for i in range(0, len(text), self.chunk_size - self.overlap):
            chunk = text[i:i + self.chunk_size]
            chunks.append({
                "text": chunk,
                "index": len(chunks),
                "start": i,
                "end": min(i + self.chunk_size, len(text))
            })

        return {"chunks": chunks, "total": len(chunks)}

class ChunkProcessor(ChainableNode):
    """Process each chunk"""

    def __init__(self, llm_model="gpt-3.5-turbo"):
        super().__init__()
        self.llm = LLMNode(
            model=llm_model,
            system_prompt="Extract key information from this text chunk"
        )

    async def execute(self, data):
        processed_chunks = []

        for chunk in data["chunks"]:
            result = await self.llm.execute({"content": chunk["text"]})
            processed_chunks.append({
                "index": chunk["index"],
                "extracted": result["content"],
                "original_length": len(chunk["text"])
            })

        return {"processed": processed_chunks}

class ResultAggregator(ChainableNode):
    """Aggregate results from all chunks"""

    async def execute(self, data):
        all_extracted = []

        for chunk in data["processed"]:
            all_extracted.append(chunk["extracted"])

        # Combine all extracted information
        combined = "\n".join(all_extracted)

        return {
            "final_result": combined,
            "chunks_processed": len(data["processed"]),
            "timestamp": "2024-01-01T00:00:00Z"
        }

# Build the complete pipeline
document_pipeline = (
    DocumentLoader()
    | ErrorHandlerNode(
        fallback={"content": "", "error": "Loading failed"}
    )
    | TextChunker(chunk_size=500, overlap=50)
    | ChunkProcessor(llm_model="gpt-4")
    | ResultAggregator()
    | FunctionNode(lambda x: {
        "success": True,
        "result": x["final_result"],
        "metadata": {
            "chunks": x["chunks_processed"],
            "processed_at": x["timestamp"]
        }
    })
)

# Execute the pipeline
result = await document_pipeline.execute({
    "file_path": "/path/to/document.txt"
})

print(json.dumps(result, indent=2))

Chain Composition Patterns

Reusable Chain Components

# Define reusable chain components
class PreProcessor(ChainableNode):
    """Standardize input format"""

    async def execute(self, data):
        return {
            "normalized": data.get("text", "").lower().strip(),
            "original": data.get("text", "")
        }

class PostProcessor(ChainableNode):
    """Format output"""

    async def execute(self, data):
        return {
            "result": data.get("content", ""),
            "confidence": data.get("confidence", 1.0),
            "formatted": True
        }

# Create reusable chain segments
input_chain = PreProcessor() | FunctionNode(lambda x: {"text": x["normalized"]})
output_chain = PostProcessor() | FunctionNode(lambda x: {"final": x["result"]})

# Compose different workflows using the same components
classification_workflow = (
    input_chain
    | LLMNode(model="gpt-3.5-turbo", system_prompt="Classify this text")
    | output_chain
)

summarization_workflow = (
    input_chain
    | LLMNode(model="gpt-3.5-turbo", system_prompt="Summarize this text")
    | output_chain
)

translation_workflow = (
    input_chain
    | LLMNode(model="gpt-3.5-turbo", system_prompt="Translate to Spanish")
    | output_chain
)

Dynamic Chain Building

def build_dynamic_chain(steps: list):
    """Build a chain dynamically from configuration"""

    chain = None

    for step in steps:
        if step["type"] == "function":
            node = FunctionNode(step["func"])
        elif step["type"] == "llm":
            node = LLMNode(
                model=step.get("model", "gpt-3.5-turbo"),
                system_prompt=step.get("prompt", "")
            )
        elif step["type"] == "custom":
            node = step["node_class"](**step.get("kwargs", {}))
        else:
            continue

        if chain is None:
            chain = node
        else:
            chain = chain | node

    return chain

# Configuration-driven chain
config = [
    {"type": "function", "func": lambda x: {"text": x["input"]}},
    {"type": "llm", "model": "gpt-4", "prompt": "Analyze this"},
    {"type": "function", "func": lambda x: {"output": x["content"]}}
]

dynamic_chain = build_dynamic_chain(config)
result = await dynamic_chain.execute({"input": "Test text"})

Error Handling in Chains

from nadoo_flow import ErrorHandlerNode, RetryNode

# Chain with comprehensive error handling
robust_chain = (
    FunctionNode(lambda x: {"value": x["input"]})
    | ErrorHandlerNode(
        node=FunctionNode(lambda x: 1/0),  # Will raise error
        fallback={"value": 0, "error": "Division by zero"}
    )
    | RetryNode(
        node=LLMNode(model="gpt-3.5-turbo"),
        max_retries=3,
        backoff_factor=2.0
    )
    | FunctionNode(lambda x: {"result": x.get("content", "No result")})
)

# Execute with error handling
result = await robust_chain.execute({"input": 10})
print(result)  # Will handle errors gracefully

Performance Tips

  • Combine simple operations into single nodes
  • Avoid unnecessary intermediate nodes
  • Use parallel processing where possible
  • Clean up large data between nodes
  • Use streaming for large datasets
  • Implement pagination for batch processing
  • Use async/await properly in custom nodes
  • Batch API calls when possible
  • Implement connection pooling

Next Steps