Skip to main content

Overview

Parallel execution in Flow Core allows multiple operations to run concurrently using Python’s asyncio, significantly improving performance for workflows with independent operations. Flow Core provides two main approaches for parallel execution:
  • BatchProcessor: For batch processing of multiple inputs with a single node
  • MapNode: For applying a node to each item in a list concurrently

BatchProcessor

The BatchProcessor class enables concurrent processing of multiple inputs:
from nadoo_flow import BatchProcessor, BaseNode, WorkflowContext

# Create a processor with concurrency control
processor = BatchProcessor(
    node=my_processing_node,
    max_concurrency=10,
    continue_on_error=True
)

# Process multiple inputs in parallel
results = await processor.batch([
    {"text": "Item 1"},
    {"text": "Item 2"},
    {"text": "Item 3"}
])

Basic Parallel Execution

Using BatchProcessor

BatchProcessor processes multiple inputs with the same node in parallel:
import asyncio
from nadoo_flow import BatchProcessor, BaseNode, NodeContext, NodeResult, WorkflowContext

class DataFetchNode(BaseNode):
    def __init__(self, node_id: str):
        super().__init__(node_id=node_id, node_type="data_fetch", name="DataFetch", config={})

    async def execute(self, node_context, workflow_context):
        source = node_context.input_data.get("source")
        # Simulate fetching data
        await asyncio.sleep(1)  # Network delay
        data = f"Data from {source}"
        return NodeResult(success=True, output={"source": source, "data": data})

# Create processor
processor = BatchProcessor(
    node=DataFetchNode("fetch"),
    max_concurrency=10  # Process up to 10 concurrently
)

# Process multiple sources in parallel
inputs = [
    {"source": "database"},
    {"source": "api"},
    {"source": "cache"}
]

results = await processor.batch(inputs)
# Total time: ~1 second instead of 3

# Access results
for result in results:
    if result.success:
        print(f"Source: {result.output['source']}, Data: {result.output['data']}")

BatchResult Structure

Each BatchResult contains:
from nadoo_flow import BatchResult

# BatchResult attributes:
result.index          # Index in the input list
result.input_data     # Original input data
result.success        # Whether processing succeeded
result.output         # Output data (if successful)
result.error          # Error message (if failed)
result.execution_time # Execution time in seconds

MapNode - Parallel List Processing

MapNode applies a child node to each item in a list, processing them concurrently:
from nadoo_flow import MapNode, BaseNode, NodeContext, NodeResult

class ItemProcessor(BaseNode):
    def __init__(self):
        super().__init__(
            node_id="item_processor",
            node_type="processor",
            name="ItemProcessor",
            config={}
        )

    async def execute(self, node_context, workflow_context):
        item = node_context.input_data.get("item")
        # Process individual item
        processed = item * 2  # Example: double the value
        return NodeResult(success=True, output={"value": processed})

# Create map node
map_node = MapNode(
    node_id="map_processor",
    child_node=ItemProcessor(),
    input_key="items",
    output_key="results",
    max_concurrency=10
)

# Execute
workflow_context = WorkflowContext()
node_context = NodeContext(
    node_id="map_processor",
    node_type="map",
    input_data={"items": [1, 2, 3, 4, 5]}
)

result = await map_node.execute(node_context, workflow_context)

# Result output:
# {
#   "results": [{"value": 2}, {"value": 4}, {"value": 6}, ...],
#   "total": 5,
#   "succeeded": 5,
#   "failed": 0
# }

Streaming Results as Completed

Process results as they complete using batch_as_completed:
processor = BatchProcessor(
    node=ItemProcessor(),
    max_concurrency=5
)

inputs = [{"item": i} for i in range(100)]

# Stream results as they complete
async for result in processor.batch_as_completed(inputs):
    if result.success:
        print(f"Item {result.index} completed: {result.output}")
    else:
        print(f"Item {result.index} failed: {result.error}")

Concurrency Control

Built-in Concurrency Limiting

Both BatchProcessor and MapNode support concurrency limits using Python’s asyncio.Semaphore:
# BatchProcessor with concurrency limit
processor = BatchProcessor(
    node=my_node,
    max_concurrency=5  # Only 5 tasks run simultaneously
)

# MapNode with concurrency limit
map_node = MapNode(
    node_id="limited_map",
    child_node=my_node,
    max_concurrency=10  # Only 10 items processed simultaneously
)

Error Handling

Control error propagation with continue_on_error:
# Stop on first error
processor = BatchProcessor(
    node=my_node,
    max_concurrency=10,
    continue_on_error=False  # Raises exception on error
)

# Continue processing despite errors
processor = BatchProcessor(
    node=my_node,
    max_concurrency=10,
    continue_on_error=True  # Collects errors in results
)

results = await processor.batch(inputs)

# Check for failures
for result in results:
    if not result.success:
        print(f"Item {result.index} failed: {result.error}")

Functional Programming Patterns

FilterNode

Filter items based on a condition:
from nadoo_flow import FilterNode

# Filter items with score > 0.5
filter_node = FilterNode(
    node_id="filter_high_scores",
    filter_fn=lambda item: item.get("score", 0) > 0.5,
    input_key="items",
    output_key="filtered"
)

node_context = NodeContext(
    node_id="filter_high_scores",
    node_type="filter",
    input_data={
        "items": [
            {"name": "A", "score": 0.8},
            {"name": "B", "score": 0.3},
            {"name": "C", "score": 0.9}
        ]
    }
)

result = await filter_node.execute(node_context, WorkflowContext())

# Result output:
# {
#   "filtered": [{"name": "A", "score": 0.8}, {"name": "C", "score": 0.9}],
#   "original_count": 3,
#   "filtered_count": 2
# }

ReduceNode

Aggregate list items into a single value:
from nadoo_flow import ReduceNode

# Sum up values
reduce_node = ReduceNode(
    node_id="sum_values",
    reduce_fn=lambda acc, item: acc + item.get("value", 0),
    initial_value=0,
    input_key="items",
    output_key="total"
)

node_context = NodeContext(
    node_id="sum_values",
    node_type="reduce",
    input_data={
        "items": [
            {"value": 10},
            {"value": 20},
            {"value": 30}
        ]
    }
)

result = await reduce_node.execute(node_context, WorkflowContext())

# Result output:
# {
#   "total": 60,
#   "processed_count": 3
# }

Combining Map and Reduce

Build a complete map-reduce pipeline:
from nadoo_flow import MapNode, ReduceNode

# Step 1: Map phase - process each item
class ProcessorNode(BaseNode):
    def __init__(self):
        super().__init__(
            node_id="processor",
            node_type="processor",
            name="Processor",
            config={}
        )

    async def execute(self, node_context, workflow_context):
        item = node_context.input_data.get("item")
        # Extract value from each item
        value = item.get("value", 0)
        return NodeResult(success=True, output={"value": value})

map_node = MapNode(
    node_id="map_phase",
    child_node=ProcessorNode(),
    input_key="items",
    output_key="mapped",
    max_concurrency=10
)

# Step 2: Reduce phase - aggregate results
reduce_node = ReduceNode(
    node_id="reduce_phase",
    reduce_fn=lambda acc, item: acc + item.get("value", 0),
    initial_value=0,
    input_key="mapped",
    output_key="total"
)

# Execute map-reduce
workflow_context = WorkflowContext()

# Map phase
map_context = NodeContext(
    node_id="map_phase",
    node_type="map",
    input_data={"items": [{"value": 1}, {"value": 2}, {"value": 3}]}
)
map_result = await map_node.execute(map_context, workflow_context)

# Reduce phase
reduce_context = NodeContext(
    node_id="reduce_phase",
    node_type="reduce",
    input_data={"mapped": map_result.output["mapped"]}
)
final_result = await reduce_node.execute(reduce_context, workflow_context)

print(f"Total: {final_result.output['total']}")  # Total: 6

Advanced Patterns

Racing Multiple Operations

Use asyncio.wait to get the first completed result:
import asyncio
from nadoo_flow import BaseNode, NodeContext, NodeResult, WorkflowContext

class SearchNode(BaseNode):
    def __init__(self, node_id: str, engine: str, delay: float):
        super().__init__(
            node_id=node_id,
            node_type="search",
            name=f"Search-{engine}",
            config={}
        )
        self.engine = engine
        self.delay = delay

    async def execute(self, node_context, workflow_context):
        await asyncio.sleep(self.delay)
        return NodeResult(
            success=True,
            output={"engine": self.engine, "results": ["result1", "result2"]}
        )

async def race_pattern():
    """Return result from fastest node"""
    workflow_context = WorkflowContext()

    # Create competing nodes
    google = SearchNode("google", "Google", delay=1.0)
    bing = SearchNode("bing", "Bing", delay=0.5)  # Faster
    duck = SearchNode("duck", "DuckDuckGo", delay=1.5)

    # Create tasks
    tasks = [
        google.execute(NodeContext("google", "search", {}), workflow_context),
        bing.execute(NodeContext("bing", "search", {}), workflow_context),
        duck.execute(NodeContext("duck", "search", {}), workflow_context)
    ]

    # Wait for first completion
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

    # Cancel remaining tasks
    for task in pending:
        task.cancel()

    # Get result from fastest
    fastest_result = list(done)[0].result()
    print(f"Fastest: {fastest_result.output['engine']}")

    return fastest_result

# Run
result = await race_pattern()

Timeout Handling

Add timeouts to prevent hanging operations:
async def process_with_timeout():
    """Execute with timeout"""
    processor = BatchProcessor(
        node=SlowNode(),
        max_concurrency=5
    )

    try:
        # Set 30 second timeout
        results = await asyncio.wait_for(
            processor.batch(inputs),
            timeout=30.0
        )
        return results
    except asyncio.TimeoutError:
        print("Processing timed out after 30 seconds")
        return None

Error Handling

Analyzing Batch Results

Check success rates and handle partial failures:
results = await processor.batch(inputs)

# Calculate success rate
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
success_rate = len(successful) / len(results)

print(f"Success rate: {success_rate:.1%}")
print(f"Successful: {len(successful)}, Failed: {len(failed)}")

# Handle failures
if success_rate < 0.8:
    print("WARNING: High failure rate!")
    for result in failed:
        print(f"  Item {result.index} failed: {result.error}")

Retrying Failed Items

Retry only the items that failed:
# First attempt
results = await processor.batch(inputs, continue_on_error=True)

# Collect failed inputs
failed_inputs = [
    result.input_data
    for result in results
    if not result.success
]

# Retry failed items
if failed_inputs:
    print(f"Retrying {len(failed_inputs)} failed items...")
    retry_results = await processor.batch(failed_inputs)

    # Merge results
    all_results = [r for r in results if r.success] + retry_results

Performance Monitoring

Tracking Execution Times

Monitor performance of parallel operations:
import time

# Track overall time
start_time = time.time()

results = await processor.batch(inputs)

total_time = time.time() - start_time

# Analyze individual execution times
execution_times = [r.execution_time for r in results if r.execution_time]
avg_time = sum(execution_times) / len(execution_times)
max_time = max(execution_times)
min_time = min(execution_times)

print(f"Total time: {total_time:.2f}s")
print(f"Average item time: {avg_time:.2f}s")
print(f"Fastest item: {min_time:.2f}s")
print(f"Slowest item: {max_time:.2f}s")

# Calculate speedup
sequential_time = sum(execution_times)
speedup = sequential_time / total_time
print(f"Speedup: {speedup:.1f}x")

Best Practices

Don’t spawn unlimited concurrent tasks. Set max_concurrency based on your resources:
# For I/O-bound tasks (API calls, DB queries)
processor = BatchProcessor(node, max_concurrency=50)

# For CPU-bound tasks
processor = BatchProcessor(node, max_concurrency=os.cpu_count())
Use continue_on_error=True to process all items even if some fail:
processor = BatchProcessor(
    node=my_node,
    max_concurrency=10,
    continue_on_error=True  # Don't stop on first error
)
Track execution times to optimize concurrency settings:
results = await processor.batch(inputs)

# Analyze timing
for result in results:
    print(f"Item {result.index}: {result.execution_time:.2f}s")
Always set timeouts to prevent hanging operations:
try:
    results = await asyncio.wait_for(
        processor.batch(inputs),
        timeout=60.0
    )
except asyncio.TimeoutError:
    print("Batch processing timed out")
For better responsiveness, use batch_as_completed:
async for result in processor.batch_as_completed(inputs):
    # Process each result immediately as it completes
    handle_result(result)

Real-World Example

Complete example combining multiple patterns:
import asyncio
from nadoo_flow import (
    BatchProcessor, MapNode, FilterNode, ReduceNode,
    BaseNode, NodeContext, NodeResult, WorkflowContext
)

class DataEnrichmentNode(BaseNode):
    """Enrich data from external API"""

    def __init__(self):
        super().__init__(
            node_id="enrich",
            node_type="enrichment",
            name="DataEnrichment",
            config={}
        )

    async def execute(self, node_context, workflow_context):
        item = node_context.input_data.get("item")

        # Simulate API call
        await asyncio.sleep(0.5)

        return NodeResult(
            success=True,
            output={
                "id": item["id"],
                "original": item,
                "enriched": {"score": item.get("value", 0) * 2}
            }
        )

async def process_data_pipeline():
    """Complete data processing pipeline"""

    # 1. Prepare input data
    raw_data = [{"id": i, "value": i * 10} for i in range(20)]

    # 2. Enrich data in parallel
    processor = BatchProcessor(
        node=DataEnrichmentNode(),
        max_concurrency=5,
        continue_on_error=True
    )

    print("Enriching data...")
    enriched_results = await processor.batch([{"item": item} for item in raw_data])

    # 3. Extract successful results
    enriched_data = [
        r.output for r in enriched_results if r.success
    ]

    # 4. Filter high-value items
    filter_node = FilterNode(
        node_id="filter_high_value",
        filter_fn=lambda item: item["enriched"]["score"] > 50,
        input_key="items"
    )

    filter_result = await filter_node.execute(
        NodeContext("filter", "filter", input_data={"items": enriched_data}),
        WorkflowContext()
    )

    # 5. Calculate total score
    reduce_node = ReduceNode(
        node_id="sum_scores",
        reduce_fn=lambda acc, item: acc + item["enriched"]["score"],
        initial_value=0,
        input_key="filtered"
    )

    final_result = await reduce_node.execute(
        NodeContext("reduce", "reduce", input_data={"filtered": filter_result.output["filtered"]}),
        WorkflowContext()
    )

    print(f"Total score: {final_result.output['result']}")
    print(f"Processed {len(raw_data)} items")
    print(f"High-value items: {filter_result.output['filtered_count']}")

# Run the pipeline
await process_data_pipeline()

Next Steps