Overview
Node chaining is the fundamental pattern in Flow Core for building workflows. The pipe operator (|) allows you to connect nodes sequentially, passing data from one node to the next.
Basic Chaining
Simple Sequential Chain
Copy
from nadoo_flow import FunctionNode, LLMNode
# Create a simple chain
workflow = (
FunctionNode(lambda x: {"text": x["input"].upper()})
| LLMNode(model="gpt-3.5-turbo")
| FunctionNode(lambda x: {"formatted": f"Result: {x['content']}"})
)
# Execute the chain
result = await workflow.execute({"input": "hello world"})
print(result["formatted"]) # Result: [LLM response]
Multi-Step Processing
Copy
from nadoo_flow import ChainableNode
class DataProcessor(ChainableNode):
"""Custom node for data processing"""
async def execute(self, data):
# Process the data
processed = data["value"] * 2
return {"processed": processed}
class Validator(ChainableNode):
"""Validate processed data"""
async def execute(self, data):
if data["processed"] < 0:
raise ValueError("Invalid data")
return {"valid": True, "data": data["processed"]}
# Chain multiple custom nodes
pipeline = (
DataProcessor()
| Validator()
| FunctionNode(lambda x: {"result": x["data"] ** 2})
)
result = await pipeline.execute({"value": 5})
print(result) # {"result": 100}
Advanced Chaining Patterns
Conditional Chaining
Copy
from nadoo_flow import ConditionalNode, ChainableNode
class Router(ConditionalNode):
"""Route based on conditions"""
def get_condition(self, data):
if data["type"] == "text":
return "text_path"
elif data["type"] == "image":
return "image_path"
else:
return "default_path"
# Define different processing paths
text_processor = LLMNode(model="gpt-4")
image_processor = FunctionNode(lambda x: {"description": "Image processed"})
default_processor = FunctionNode(lambda x: {"error": "Unknown type"})
# Create conditional chain
router = Router()
router.add_path("text_path", text_processor)
router.add_path("image_path", image_processor)
router.add_path("default_path", default_processor)
# Execute with different inputs
text_result = await router.execute({"type": "text", "content": "Hello"})
image_result = await router.execute({"type": "image", "url": "image.jpg"})
Parallel Chains with Merge
Copy
from nadoo_flow import ParallelNode, MergeNode
# Create parallel processing chains
summarizer = LLMNode(
model="gpt-3.5-turbo",
system_prompt="Summarize the text"
)
sentiment_analyzer = LLMNode(
model="gpt-3.5-turbo",
system_prompt="Analyze sentiment: positive, negative, or neutral"
)
keyword_extractor = FunctionNode(
lambda x: {"keywords": x["text"].split()[:5]}
)
# Combine parallel chains
workflow = (
FunctionNode(lambda x: {"text": x["input"]})
| ParallelNode([
summarizer,
sentiment_analyzer,
keyword_extractor
])
| MergeNode(merge_strategy="combine")
| FunctionNode(lambda x: {
"analysis": {
"summary": x[0]["content"],
"sentiment": x[1]["content"],
"keywords": x[2]["keywords"]
}
})
)
result = await workflow.execute({
"input": "This is a great product! I love using it every day."
})
Real-World Example: Document Processing Pipeline
Copy
from nadoo_flow import (
ChainableNode,
LLMNode,
FunctionNode,
ConditionalNode,
ErrorHandlerNode
)
import hashlib
import json
class DocumentLoader(ChainableNode):
"""Load and validate document"""
async def execute(self, data):
try:
with open(data["file_path"], 'r') as f:
content = f.read()
return {
"content": content,
"length": len(content),
"hash": hashlib.md5(content.encode()).hexdigest()
}
except FileNotFoundError:
return {"error": "File not found", "content": ""}
class TextChunker(ChainableNode):
"""Split text into chunks"""
def __init__(self, chunk_size=1000, overlap=100):
super().__init__()
self.chunk_size = chunk_size
self.overlap = overlap
async def execute(self, data):
text = data["content"]
chunks = []
for i in range(0, len(text), self.chunk_size - self.overlap):
chunk = text[i:i + self.chunk_size]
chunks.append({
"text": chunk,
"index": len(chunks),
"start": i,
"end": min(i + self.chunk_size, len(text))
})
return {"chunks": chunks, "total": len(chunks)}
class ChunkProcessor(ChainableNode):
"""Process each chunk"""
def __init__(self, llm_model="gpt-3.5-turbo"):
super().__init__()
self.llm = LLMNode(
model=llm_model,
system_prompt="Extract key information from this text chunk"
)
async def execute(self, data):
processed_chunks = []
for chunk in data["chunks"]:
result = await self.llm.execute({"content": chunk["text"]})
processed_chunks.append({
"index": chunk["index"],
"extracted": result["content"],
"original_length": len(chunk["text"])
})
return {"processed": processed_chunks}
class ResultAggregator(ChainableNode):
"""Aggregate results from all chunks"""
async def execute(self, data):
all_extracted = []
for chunk in data["processed"]:
all_extracted.append(chunk["extracted"])
# Combine all extracted information
combined = "\n".join(all_extracted)
return {
"final_result": combined,
"chunks_processed": len(data["processed"]),
"timestamp": "2024-01-01T00:00:00Z"
}
# Build the complete pipeline
document_pipeline = (
DocumentLoader()
| ErrorHandlerNode(
fallback={"content": "", "error": "Loading failed"}
)
| TextChunker(chunk_size=500, overlap=50)
| ChunkProcessor(llm_model="gpt-4")
| ResultAggregator()
| FunctionNode(lambda x: {
"success": True,
"result": x["final_result"],
"metadata": {
"chunks": x["chunks_processed"],
"processed_at": x["timestamp"]
}
})
)
# Execute the pipeline
result = await document_pipeline.execute({
"file_path": "/path/to/document.txt"
})
print(json.dumps(result, indent=2))
Chain Composition Patterns
Reusable Chain Components
Copy
# Define reusable chain components
class PreProcessor(ChainableNode):
"""Standardize input format"""
async def execute(self, data):
return {
"normalized": data.get("text", "").lower().strip(),
"original": data.get("text", "")
}
class PostProcessor(ChainableNode):
"""Format output"""
async def execute(self, data):
return {
"result": data.get("content", ""),
"confidence": data.get("confidence", 1.0),
"formatted": True
}
# Create reusable chain segments
input_chain = PreProcessor() | FunctionNode(lambda x: {"text": x["normalized"]})
output_chain = PostProcessor() | FunctionNode(lambda x: {"final": x["result"]})
# Compose different workflows using the same components
classification_workflow = (
input_chain
| LLMNode(model="gpt-3.5-turbo", system_prompt="Classify this text")
| output_chain
)
summarization_workflow = (
input_chain
| LLMNode(model="gpt-3.5-turbo", system_prompt="Summarize this text")
| output_chain
)
translation_workflow = (
input_chain
| LLMNode(model="gpt-3.5-turbo", system_prompt="Translate to Spanish")
| output_chain
)
Dynamic Chain Building
Copy
def build_dynamic_chain(steps: list):
"""Build a chain dynamically from configuration"""
chain = None
for step in steps:
if step["type"] == "function":
node = FunctionNode(step["func"])
elif step["type"] == "llm":
node = LLMNode(
model=step.get("model", "gpt-3.5-turbo"),
system_prompt=step.get("prompt", "")
)
elif step["type"] == "custom":
node = step["node_class"](**step.get("kwargs", {}))
else:
continue
if chain is None:
chain = node
else:
chain = chain | node
return chain
# Configuration-driven chain
config = [
{"type": "function", "func": lambda x: {"text": x["input"]}},
{"type": "llm", "model": "gpt-4", "prompt": "Analyze this"},
{"type": "function", "func": lambda x: {"output": x["content"]}}
]
dynamic_chain = build_dynamic_chain(config)
result = await dynamic_chain.execute({"input": "Test text"})
Error Handling in Chains
Copy
from nadoo_flow import ErrorHandlerNode, RetryNode
# Chain with comprehensive error handling
robust_chain = (
FunctionNode(lambda x: {"value": x["input"]})
| ErrorHandlerNode(
node=FunctionNode(lambda x: 1/0), # Will raise error
fallback={"value": 0, "error": "Division by zero"}
)
| RetryNode(
node=LLMNode(model="gpt-3.5-turbo"),
max_retries=3,
backoff_factor=2.0
)
| FunctionNode(lambda x: {"result": x.get("content", "No result")})
)
# Execute with error handling
result = await robust_chain.execute({"input": 10})
print(result) # Will handle errors gracefully
Performance Tips
Optimize Chain Length
Optimize Chain Length
- Combine simple operations into single nodes
- Avoid unnecessary intermediate nodes
- Use parallel processing where possible
Memory Management
Memory Management
- Clean up large data between nodes
- Use streaming for large datasets
- Implement pagination for batch processing
Async Optimization
Async Optimization
- Use async/await properly in custom nodes
- Batch API calls when possible
- Implement connection pooling