Skip to main content

Overview

Flow Core’s batch processing capabilities enable efficient parallel execution of nodes over multiple inputs, with built-in concurrency control, error handling, and result aggregation.

Core Components

BatchProcessor

Process multiple inputs through a single node with controlled concurrency:
from nadoo_flow import BatchProcessor

# Create batch processor
processor = BatchProcessor(
    node=my_llm_node,
    max_concurrency=10,      # Max parallel executions
    continue_on_error=True   # Continue if some fail
)

# Process all inputs
results = await processor.batch([
    {"prompt": "Translate to Korean: Hello"},
    {"prompt": "Translate to Spanish: Hello"},
    {"prompt": "Translate to French: Hello"}
])

# Process and get results as they complete
async for result in processor.batch_as_completed(inputs):
    print(f"Completed {result.index}: {result.success}")
    if result.success:
        print(f"Output: {result.output}")

BatchResult

Individual batch item result:
@dataclass
class BatchResult:
    index: int                    # Item index in batch
    input_data: dict              # Original input
    success: bool                 # Success/failure
    output: dict | None          # Output if successful
    error: str | None            # Error message if failed
    execution_time: float | None # Time taken (seconds)

Map/Reduce Operations

MapNode

Apply a node to each item in a list:
from nadoo_flow import MapNode

# Translate multiple texts
map_node = MapNode(
    node_id="batch_translate",
    child_node=translation_node,
    input_key="texts",        # List in input
    output_key="translations", # List in output
    max_concurrency=5          # Process 5 at a time
)

# Usage
result = await map_node.execute(
    NodeContext(input_data={
        "texts": ["Hello", "World", "AI"]
    }),
    workflow_context
)
# result.output = {"translations": ["안녕하세요", "세계", "AI"]}

FilterNode

Filter items based on a condition:
from nadoo_flow import FilterNode

# Filter high-confidence results
filter_node = FilterNode(
    node_id="filter_confident",
    filter_fn=lambda x: x["confidence"] > 0.8,
    input_key="predictions",
    output_key="confident_predictions"
)

# Usage
result = await filter_node.execute(
    NodeContext(input_data={
        "predictions": [
            {"text": "A", "confidence": 0.9},
            {"text": "B", "confidence": 0.6},
            {"text": "C", "confidence": 0.95}
        ]
    }),
    workflow_context
)
# result.output = {"confident_predictions": [{"text": "A", ...}, {"text": "C", ...}]}

ReduceNode

Reduce a list to a single value:
from nadoo_flow import ReduceNode

# Sum scores
reduce_node = ReduceNode(
    node_id="sum_scores",
    reduce_fn=lambda acc, x: acc + x["score"],
    initial_value=0,
    input_key="results",
    output_key="total_score"
)

# Aggregate responses
aggregate_node = ReduceNode(
    node_id="aggregate",
    reduce_fn=lambda acc, x: {
        **acc,
        x["category"]: acc.get(x["category"], []) + [x["item"]]
    },
    initial_value={},
    input_key="items",
    output_key="grouped"
)

Batch Execution Patterns

Pattern 1: Parallel Document Processing

from nadoo_flow import BatchProcessor, BaseNode

class DocumentProcessor(BaseNode):
    async def execute(self, node_context, workflow_context):
        doc = node_context.input_data["document"]

        # Extract text
        text = await self.extract_text(doc)

        # Generate summary
        summary = await self.summarize(text)

        # Extract entities
        entities = await self.extract_entities(text)

        return NodeResult(
            success=True,
            output={
                "summary": summary,
                "entities": entities,
                "word_count": len(text.split())
            }
        )

# Process multiple documents
processor = BatchProcessor(
    node=DocumentProcessor("doc_processor"),
    max_concurrency=5  # Process 5 documents at once
)

documents = [
    {"document": "path/to/doc1.pdf"},
    {"document": "path/to/doc2.pdf"},
    # ... hundreds more
]

results = await processor.batch(documents)

# Analyze results
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
print(f"Processed: {len(successful)}/{len(documents)}")

Pattern 2: Multi-Model Inference

from nadoo_flow import MapNode, ReduceNode

# Run multiple models in parallel
multi_model = MapNode(
    node_id="multi_model",
    child_node=ParallelNode(
        nodes=[
            GPT4Node("gpt4"),
            ClaudeNode("claude"),
            GeminiNode("gemini")
        ],
        strategy=ParallelStrategy.ALL_SETTLED
    ),
    input_key="prompts",
    output_key="model_outputs",
    max_concurrency=3
)

# Aggregate results
consensus = ReduceNode(
    node_id="find_consensus",
    reduce_fn=lambda acc, outputs: {
        "consensus": most_common_answer(outputs),
        "confidence": calculate_agreement(outputs),
        "all_answers": outputs
    },
    input_key="model_outputs",
    output_key="final_answer"
)

# Pipeline
pipeline = multi_model | consensus

Pattern 3: Rate-Limited API Calls

from nadoo_flow import BatchProcessor
import asyncio

class RateLimitedProcessor(BatchProcessor):
    def __init__(self, node, requests_per_second=10):
        super().__init__(
            node=node,
            max_concurrency=requests_per_second
        )
        self.delay = 1.0 / requests_per_second

    async def batch_as_completed(self, inputs):
        """Process with rate limiting"""
        for i, input_data in enumerate(inputs):
            # Rate limit
            if i > 0:
                await asyncio.sleep(self.delay)

            # Process
            result = await self.process_single(input_data, i)
            yield result

# Use with API calls
api_processor = RateLimitedProcessor(
    node=APICallNode("api"),
    requests_per_second=10
)

async for result in api_processor.batch_as_completed(api_requests):
    print(f"API call {result.index}: {result.success}")

Advanced Features

Chunked Processing

Process large datasets in chunks:
from nadoo_flow import BatchProcessor

async def process_large_dataset(data, chunk_size=100):
    processor = BatchProcessor(
        node=processing_node,
        max_concurrency=10
    )

    all_results = []

    # Process in chunks
    for i in range(0, len(data), chunk_size):
        chunk = data[i:i + chunk_size]
        results = await processor.batch(chunk)
        all_results.extend(results)

        # Progress update
        print(f"Processed {min(i + chunk_size, len(data))}/{len(data)}")

    return all_results

Dynamic Batching

Adjust batch size based on performance:
class DynamicBatchProcessor:
    def __init__(self, node, target_time=1.0):
        self.node = node
        self.target_time = target_time
        self.batch_size = 10  # Start size

    async def process(self, items):
        processor = BatchProcessor(
            node=self.node,
            max_concurrency=self.batch_size
        )

        start_time = time.time()
        results = await processor.batch(items[:self.batch_size])
        elapsed = time.time() - start_time

        # Adjust batch size
        if elapsed < self.target_time * 0.8:
            self.batch_size = min(self.batch_size * 2, 100)
        elif elapsed > self.target_time * 1.2:
            self.batch_size = max(self.batch_size // 2, 1)

        return results

Error Recovery

Handle and retry failed items:
async def batch_with_retry(processor, inputs, max_retries=3):
    results = []
    retry_items = []

    # Initial batch
    batch_results = await processor.batch(inputs)

    for result in batch_results:
        if result.success:
            results.append(result)
        else:
            retry_items.append((result.input_data, result.index, 0))

    # Retry failed items
    while retry_items and max_retries > 0:
        retry_batch = []

        for input_data, original_index, retry_count in retry_items:
            if retry_count < max_retries:
                retry_batch.append((input_data, original_index, retry_count + 1))

        if not retry_batch:
            break

        # Exponential backoff
        await asyncio.sleep(2 ** retry_batch[0][2])

        # Retry
        retry_results = await processor.batch([item[0] for item in retry_batch])

        new_retry_items = []
        for i, result in enumerate(retry_results):
            if result.success:
                result.index = retry_batch[i][1]  # Restore original index
                results.append(result)
            else:
                new_retry_items.append(retry_batch[i])

        retry_items = new_retry_items

    return sorted(results, key=lambda r: r.index)

Performance Optimization

Concurrency Tuning

# CPU-bound tasks: match CPU cores
import multiprocessing
processor = BatchProcessor(
    node=cpu_intensive_node,
    max_concurrency=multiprocessing.cpu_count()
)

# I/O-bound tasks: higher concurrency
processor = BatchProcessor(
    node=api_call_node,
    max_concurrency=50  # Many concurrent I/O operations
)

# Memory-intensive: limit concurrency
processor = BatchProcessor(
    node=large_model_node,
    max_concurrency=2  # Avoid OOM
)

Result Streaming

Process results as they arrive:
async def stream_batch_results(processor, inputs):
    """Stream results to client as they complete"""
    async for result in processor.batch_as_completed(inputs):
        if result.success:
            # Send to client immediately
            yield {
                "index": result.index,
                "data": result.output,
                "status": "completed"
            }
        else:
            yield {
                "index": result.index,
                "error": result.error,
                "status": "failed"
            }

Best Practices

Consider resource constraints and API limits:
  • CPU-bound: cpu_count()
  • I/O-bound: 10-100
  • Memory-intensive: 1-5
  • Rate-limited APIs: Follow limits
Use continue_on_error=True and process results:
results = await processor.batch(inputs)
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
Use batch_as_completed for long-running batches:
total = len(inputs)
completed = 0
async for result in processor.batch_as_completed(inputs):
    completed += 1
    print(f"Progress: {completed}/{total}")
Retry failed items with exponential backoff:
for attempt in range(max_retries):
    if attempt > 0:
        await asyncio.sleep(2 ** attempt)
    # Retry logic

Complete Example

from nadoo_flow import (
    BatchProcessor, MapNode, FilterNode, ReduceNode,
    BaseNode, NodeResult, NodeContext, WorkflowContext
)

class SentimentAnalyzer(BaseNode):
    """Analyze sentiment of text"""

    async def execute(self, node_context, workflow_context):
        text = node_context.input_data["text"]

        # Simulate sentiment analysis
        sentiment = await self.analyze_sentiment(text)
        score = await self.calculate_score(text)

        return NodeResult(
            success=True,
            output={
                "text": text,
                "sentiment": sentiment,
                "score": score
            }
        )

async def batch_sentiment_analysis():
    # Sample texts
    texts = [
        "I love this product!",
        "Terrible experience.",
        "It's okay, nothing special.",
        # ... hundreds more
    ]

    # Create batch processor
    processor = BatchProcessor(
        node=SentimentAnalyzer("analyzer"),
        max_concurrency=10,
        continue_on_error=True
    )

    # Process all texts
    results = await processor.batch([
        {"text": text} for text in texts
    ])

    # Analyze results
    positive = [r for r in results if r.success and r.output["sentiment"] == "positive"]
    negative = [r for r in results if r.success and r.output["sentiment"] == "negative"]
    neutral = [r for r in results if r.success and r.output["sentiment"] == "neutral"]

    print(f"Positive: {len(positive)}")
    print(f"Negative: {len(negative)}")
    print(f"Neutral: {len(neutral)}")

    # Calculate average score
    scores = [r.output["score"] for r in results if r.success]
    avg_score = sum(scores) / len(scores) if scores else 0
    print(f"Average score: {avg_score:.2f}")

# Run
asyncio.run(batch_sentiment_analysis())

See Also