Overview
Flow Core’s batch processing capabilities enable efficient parallel execution of nodes over multiple inputs, with built-in concurrency control, error handling, and result aggregation.Core Components
BatchProcessor
Process multiple inputs through a single node with controlled concurrency:Copy
from nadoo_flow import BatchProcessor
# Create batch processor
processor = BatchProcessor(
node=my_llm_node,
max_concurrency=10, # Max parallel executions
continue_on_error=True # Continue if some fail
)
# Process all inputs
results = await processor.batch([
{"prompt": "Translate to Korean: Hello"},
{"prompt": "Translate to Spanish: Hello"},
{"prompt": "Translate to French: Hello"}
])
# Process and get results as they complete
async for result in processor.batch_as_completed(inputs):
print(f"Completed {result.index}: {result.success}")
if result.success:
print(f"Output: {result.output}")
BatchResult
Individual batch item result:Copy
@dataclass
class BatchResult:
index: int # Item index in batch
input_data: dict # Original input
success: bool # Success/failure
output: dict | None # Output if successful
error: str | None # Error message if failed
execution_time: float | None # Time taken (seconds)
Map/Reduce Operations
MapNode
Apply a node to each item in a list:Copy
from nadoo_flow import MapNode
# Translate multiple texts
map_node = MapNode(
node_id="batch_translate",
child_node=translation_node,
input_key="texts", # List in input
output_key="translations", # List in output
max_concurrency=5 # Process 5 at a time
)
# Usage
result = await map_node.execute(
NodeContext(input_data={
"texts": ["Hello", "World", "AI"]
}),
workflow_context
)
# result.output = {"translations": ["안녕하세요", "세계", "AI"]}
FilterNode
Filter items based on a condition:Copy
from nadoo_flow import FilterNode
# Filter high-confidence results
filter_node = FilterNode(
node_id="filter_confident",
filter_fn=lambda x: x["confidence"] > 0.8,
input_key="predictions",
output_key="confident_predictions"
)
# Usage
result = await filter_node.execute(
NodeContext(input_data={
"predictions": [
{"text": "A", "confidence": 0.9},
{"text": "B", "confidence": 0.6},
{"text": "C", "confidence": 0.95}
]
}),
workflow_context
)
# result.output = {"confident_predictions": [{"text": "A", ...}, {"text": "C", ...}]}
ReduceNode
Reduce a list to a single value:Copy
from nadoo_flow import ReduceNode
# Sum scores
reduce_node = ReduceNode(
node_id="sum_scores",
reduce_fn=lambda acc, x: acc + x["score"],
initial_value=0,
input_key="results",
output_key="total_score"
)
# Aggregate responses
aggregate_node = ReduceNode(
node_id="aggregate",
reduce_fn=lambda acc, x: {
**acc,
x["category"]: acc.get(x["category"], []) + [x["item"]]
},
initial_value={},
input_key="items",
output_key="grouped"
)
Batch Execution Patterns
Pattern 1: Parallel Document Processing
Copy
from nadoo_flow import BatchProcessor, BaseNode
class DocumentProcessor(BaseNode):
async def execute(self, node_context, workflow_context):
doc = node_context.input_data["document"]
# Extract text
text = await self.extract_text(doc)
# Generate summary
summary = await self.summarize(text)
# Extract entities
entities = await self.extract_entities(text)
return NodeResult(
success=True,
output={
"summary": summary,
"entities": entities,
"word_count": len(text.split())
}
)
# Process multiple documents
processor = BatchProcessor(
node=DocumentProcessor("doc_processor"),
max_concurrency=5 # Process 5 documents at once
)
documents = [
{"document": "path/to/doc1.pdf"},
{"document": "path/to/doc2.pdf"},
# ... hundreds more
]
results = await processor.batch(documents)
# Analyze results
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
print(f"Processed: {len(successful)}/{len(documents)}")
Pattern 2: Multi-Model Inference
Copy
from nadoo_flow import MapNode, ReduceNode
# Run multiple models in parallel
multi_model = MapNode(
node_id="multi_model",
child_node=ParallelNode(
nodes=[
GPT4Node("gpt4"),
ClaudeNode("claude"),
GeminiNode("gemini")
],
strategy=ParallelStrategy.ALL_SETTLED
),
input_key="prompts",
output_key="model_outputs",
max_concurrency=3
)
# Aggregate results
consensus = ReduceNode(
node_id="find_consensus",
reduce_fn=lambda acc, outputs: {
"consensus": most_common_answer(outputs),
"confidence": calculate_agreement(outputs),
"all_answers": outputs
},
input_key="model_outputs",
output_key="final_answer"
)
# Pipeline
pipeline = multi_model | consensus
Pattern 3: Rate-Limited API Calls
Copy
from nadoo_flow import BatchProcessor
import asyncio
class RateLimitedProcessor(BatchProcessor):
def __init__(self, node, requests_per_second=10):
super().__init__(
node=node,
max_concurrency=requests_per_second
)
self.delay = 1.0 / requests_per_second
async def batch_as_completed(self, inputs):
"""Process with rate limiting"""
for i, input_data in enumerate(inputs):
# Rate limit
if i > 0:
await asyncio.sleep(self.delay)
# Process
result = await self.process_single(input_data, i)
yield result
# Use with API calls
api_processor = RateLimitedProcessor(
node=APICallNode("api"),
requests_per_second=10
)
async for result in api_processor.batch_as_completed(api_requests):
print(f"API call {result.index}: {result.success}")
Advanced Features
Chunked Processing
Process large datasets in chunks:Copy
from nadoo_flow import BatchProcessor
async def process_large_dataset(data, chunk_size=100):
processor = BatchProcessor(
node=processing_node,
max_concurrency=10
)
all_results = []
# Process in chunks
for i in range(0, len(data), chunk_size):
chunk = data[i:i + chunk_size]
results = await processor.batch(chunk)
all_results.extend(results)
# Progress update
print(f"Processed {min(i + chunk_size, len(data))}/{len(data)}")
return all_results
Dynamic Batching
Adjust batch size based on performance:Copy
class DynamicBatchProcessor:
def __init__(self, node, target_time=1.0):
self.node = node
self.target_time = target_time
self.batch_size = 10 # Start size
async def process(self, items):
processor = BatchProcessor(
node=self.node,
max_concurrency=self.batch_size
)
start_time = time.time()
results = await processor.batch(items[:self.batch_size])
elapsed = time.time() - start_time
# Adjust batch size
if elapsed < self.target_time * 0.8:
self.batch_size = min(self.batch_size * 2, 100)
elif elapsed > self.target_time * 1.2:
self.batch_size = max(self.batch_size // 2, 1)
return results
Error Recovery
Handle and retry failed items:Copy
async def batch_with_retry(processor, inputs, max_retries=3):
results = []
retry_items = []
# Initial batch
batch_results = await processor.batch(inputs)
for result in batch_results:
if result.success:
results.append(result)
else:
retry_items.append((result.input_data, result.index, 0))
# Retry failed items
while retry_items and max_retries > 0:
retry_batch = []
for input_data, original_index, retry_count in retry_items:
if retry_count < max_retries:
retry_batch.append((input_data, original_index, retry_count + 1))
if not retry_batch:
break
# Exponential backoff
await asyncio.sleep(2 ** retry_batch[0][2])
# Retry
retry_results = await processor.batch([item[0] for item in retry_batch])
new_retry_items = []
for i, result in enumerate(retry_results):
if result.success:
result.index = retry_batch[i][1] # Restore original index
results.append(result)
else:
new_retry_items.append(retry_batch[i])
retry_items = new_retry_items
return sorted(results, key=lambda r: r.index)
Performance Optimization
Concurrency Tuning
Copy
# CPU-bound tasks: match CPU cores
import multiprocessing
processor = BatchProcessor(
node=cpu_intensive_node,
max_concurrency=multiprocessing.cpu_count()
)
# I/O-bound tasks: higher concurrency
processor = BatchProcessor(
node=api_call_node,
max_concurrency=50 # Many concurrent I/O operations
)
# Memory-intensive: limit concurrency
processor = BatchProcessor(
node=large_model_node,
max_concurrency=2 # Avoid OOM
)
Result Streaming
Process results as they arrive:Copy
async def stream_batch_results(processor, inputs):
"""Stream results to client as they complete"""
async for result in processor.batch_as_completed(inputs):
if result.success:
# Send to client immediately
yield {
"index": result.index,
"data": result.output,
"status": "completed"
}
else:
yield {
"index": result.index,
"error": result.error,
"status": "failed"
}
Best Practices
Set Appropriate Concurrency
Set Appropriate Concurrency
Consider resource constraints and API limits:
- CPU-bound:
cpu_count() - I/O-bound: 10-100
- Memory-intensive: 1-5
- Rate-limited APIs: Follow limits
Handle Partial Failures
Handle Partial Failures
Use
continue_on_error=True and process results:Copy
results = await processor.batch(inputs)
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
Monitor Progress
Monitor Progress
Use
batch_as_completed for long-running batches:Copy
total = len(inputs)
completed = 0
async for result in processor.batch_as_completed(inputs):
completed += 1
print(f"Progress: {completed}/{total}")
Implement Retry Logic
Implement Retry Logic
Retry failed items with exponential backoff:
Copy
for attempt in range(max_retries):
if attempt > 0:
await asyncio.sleep(2 ** attempt)
# Retry logic
Complete Example
Copy
from nadoo_flow import (
BatchProcessor, MapNode, FilterNode, ReduceNode,
BaseNode, NodeResult, NodeContext, WorkflowContext
)
class SentimentAnalyzer(BaseNode):
"""Analyze sentiment of text"""
async def execute(self, node_context, workflow_context):
text = node_context.input_data["text"]
# Simulate sentiment analysis
sentiment = await self.analyze_sentiment(text)
score = await self.calculate_score(text)
return NodeResult(
success=True,
output={
"text": text,
"sentiment": sentiment,
"score": score
}
)
async def batch_sentiment_analysis():
# Sample texts
texts = [
"I love this product!",
"Terrible experience.",
"It's okay, nothing special.",
# ... hundreds more
]
# Create batch processor
processor = BatchProcessor(
node=SentimentAnalyzer("analyzer"),
max_concurrency=10,
continue_on_error=True
)
# Process all texts
results = await processor.batch([
{"text": text} for text in texts
])
# Analyze results
positive = [r for r in results if r.success and r.output["sentiment"] == "positive"]
negative = [r for r in results if r.success and r.output["sentiment"] == "negative"]
neutral = [r for r in results if r.success and r.output["sentiment"] == "neutral"]
print(f"Positive: {len(positive)}")
print(f"Negative: {len(negative)}")
print(f"Neutral: {len(neutral)}")
# Calculate average score
scores = [r.output["score"] for r in results if r.success]
avg_score = sum(scores) / len(scores) if scores else 0
print(f"Average score: {avg_score:.2f}")
# Run
asyncio.run(batch_sentiment_analysis())