Overview
Parallel execution in Flow Core allows multiple operations to run concurrently using Python’sasyncio, significantly improving performance for workflows with independent operations.
Flow Core provides two main approaches for parallel execution:
- BatchProcessor: For batch processing of multiple inputs with a single node
- MapNode: For applying a node to each item in a list concurrently
BatchProcessor
TheBatchProcessor class enables concurrent processing of multiple inputs:
Basic Parallel Execution
Using BatchProcessor
BatchProcessor processes multiple inputs with the same node in parallel:
BatchResult Structure
EachBatchResult contains:
MapNode - Parallel List Processing
MapNode applies a child node to each item in a list, processing them concurrently:
Streaming Results as Completed
Process results as they complete usingbatch_as_completed:
Concurrency Control
Built-in Concurrency Limiting
BothBatchProcessor and MapNode support concurrency limits using Python’s asyncio.Semaphore:
Error Handling
Control error propagation withcontinue_on_error:
Functional Programming Patterns
FilterNode
Filter items based on a condition:ReduceNode
Aggregate list items into a single value:Combining Map and Reduce
Build a complete map-reduce pipeline:Advanced Patterns
Racing Multiple Operations
Useasyncio.wait to get the first completed result:
Timeout Handling
Add timeouts to prevent hanging operations:Error Handling
Analyzing Batch Results
Check success rates and handle partial failures:Retrying Failed Items
Retry only the items that failed:Performance Monitoring
Tracking Execution Times
Monitor performance of parallel operations:Best Practices
Set Appropriate Concurrency Limits
Set Appropriate Concurrency Limits
Don’t spawn unlimited concurrent tasks. Set
max_concurrency based on your resources:Handle Errors Gracefully
Handle Errors Gracefully
Use
continue_on_error=True to process all items even if some fail:Monitor Performance
Monitor Performance
Track execution times to optimize concurrency settings:
Use Timeouts
Use Timeouts
Always set timeouts to prevent hanging operations:
Process Results as Completed
Process Results as Completed
For better responsiveness, use
batch_as_completed: