Overview
The WorkflowExecutor is the core engine that orchestrates node execution, manages state, handles errors, and ensures workflows run efficiently and reliably.WorkflowExecutor
The main execution engine for workflows:Copy
from nadoo_flow import WorkflowExecutor, ExecutionConfig
class WorkflowExecutor:
"""Core workflow execution engine"""
def __init__(self, config: Optional[ExecutionConfig] = None):
self.config = config or ExecutionConfig()
self.callback_manager = CallbackManager()
self.execution_history = []
async def execute(
self,
workflow: Workflow,
input_data: Dict[str, Any],
context: Optional[WorkflowContext] = None
) -> ExecutionResult:
"""Execute a workflow"""
pass
Execution Configuration
Configure execution behavior:Copy
from nadoo_flow import ExecutionConfig
config = ExecutionConfig(
max_parallel_nodes=10,
timeout_seconds=300,
retry_failed_nodes=True,
max_retries=3,
enable_checkpoints=True,
checkpoint_interval=5,
trace_execution=True
)
executor = WorkflowExecutor(config)
Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
max_parallel_nodes | int | 10 | Maximum concurrent node executions |
timeout_seconds | int | 300 | Global workflow timeout |
retry_failed_nodes | bool | False | Auto-retry failed nodes |
max_retries | int | 3 | Maximum retry attempts |
enable_checkpoints | bool | False | Enable execution checkpoints |
trace_execution | bool | False | Enable detailed execution tracing |
Execution Strategies
Sequential Execution
Nodes execute one after another:Copy
class SequentialExecutor(WorkflowExecutor):
async def execute(self, workflow, input_data, context=None):
context = context or WorkflowContext(workflow.id)
current_data = input_data
for node in workflow.nodes:
node_context = NodeContext(node.node_id)
node_context.input_data = current_data
# Execute node
result = await node.execute(node_context, context)
if not result.success:
return ExecutionResult(
success=False,
error=f"Node {node.node_id} failed: {result.error}"
)
current_data = result.output
context.node_results[node.node_id] = result
return ExecutionResult(
success=True,
output=current_data,
context=context
)
Parallel Execution
Execute independent nodes concurrently:Copy
class ParallelExecutor(WorkflowExecutor):
async def execute(self, workflow, input_data, context=None):
context = context or WorkflowContext(workflow.id)
# Group nodes by dependencies
execution_groups = self._group_by_dependencies(workflow.nodes)
for group in execution_groups:
# Execute nodes in group concurrently
tasks = []
for node in group:
node_context = NodeContext(node.node_id)
node_context.input_data = self._get_node_input(
node, context, input_data
)
tasks.append(node.execute(node_context, context))
# Wait for all nodes in group
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for node, result in zip(group, results):
if isinstance(result, Exception):
return ExecutionResult(
success=False,
error=f"Node {node.node_id} failed: {result}"
)
context.node_results[node.node_id] = result
return ExecutionResult(
success=True,
output=self._get_final_output(context),
context=context
)
Conditional Execution
Execute nodes based on conditions:Copy
class ConditionalExecutor(WorkflowExecutor):
async def execute(self, workflow, input_data, context=None):
context = context or WorkflowContext(workflow.id)
current_node = workflow.start_node
while current_node:
node_context = NodeContext(current_node.node_id)
node_context.input_data = input_data
result = await current_node.execute(node_context, context)
context.node_results[current_node.node_id] = result
if not result.success:
return ExecutionResult(success=False, error=result.error)
# Determine next node based on conditions
if result.conditional_next:
condition = self._evaluate_condition(
result.output, result.conditional_next
)
current_node = workflow.get_node(condition)
else:
current_node = workflow.get_node(result.next_node_id)
return ExecutionResult(
success=True,
output=context.node_results,
context=context
)
Execution Flow Control
Starting Execution
Copy
# Simple execution
executor = WorkflowExecutor()
result = await executor.execute(
workflow=my_workflow,
input_data={"user_id": 123}
)
# With custom context
context = WorkflowContext("workflow_1")
context.global_variables["api_key"] = "sk-..."
result = await executor.execute(
workflow=my_workflow,
input_data={"user_id": 123},
context=context
)
Pausing and Resuming
Copy
class PausableExecutor(WorkflowExecutor):
def __init__(self):
super().__init__()
self.paused = False
self.pause_point = None
async def execute(self, workflow, input_data, context=None):
for i, node in enumerate(workflow.nodes):
if self.paused and self.pause_point == i:
# Save state for resume
await self._save_checkpoint(context, i)
return ExecutionResult(
success=True,
paused=True,
checkpoint=i
)
result = await node.execute(node_context, context)
# ... continue execution
async def resume(self, workflow, checkpoint_id):
"""Resume from checkpoint"""
context, start_index = await self._load_checkpoint(checkpoint_id)
# Continue from saved point
return await self._execute_from(workflow, context, start_index)
Cancellation
Copy
class CancellableExecutor(WorkflowExecutor):
def __init__(self):
super().__init__()
self.cancelled = False
async def execute(self, workflow, input_data, context=None):
for node in workflow.nodes:
if self.cancelled:
return ExecutionResult(
success=False,
cancelled=True,
partial_results=context.node_results
)
result = await node.execute(node_context, context)
# ... continue
def cancel(self):
"""Cancel ongoing execution"""
self.cancelled = True
Execution Monitoring
Execution Callbacks
Copy
from nadoo_flow import BaseCallbackHandler
class ExecutionMonitor(BaseCallbackHandler):
async def on_workflow_start(self, workflow_id, input_data):
print(f"Workflow {workflow_id} started")
async def on_node_start(self, node_id, input_data):
print(f"Node {node_id} started")
async def on_node_complete(self, node_id, output_data):
print(f"Node {node_id} completed")
async def on_workflow_complete(self, workflow_id, output_data):
print(f"Workflow {workflow_id} completed")
async def on_error(self, error_info):
print(f"Error: {error_info}")
# Register callbacks
executor = WorkflowExecutor()
executor.callback_manager.add_handler(ExecutionMonitor())
Execution Metrics
Copy
class MetricsCollector:
"""Collect execution metrics"""
def __init__(self):
self.metrics = {
"total_executions": 0,
"successful_executions": 0,
"failed_executions": 0,
"average_duration": 0,
"node_execution_times": {}
}
async def record_execution(self, result: ExecutionResult):
self.metrics["total_executions"] += 1
if result.success:
self.metrics["successful_executions"] += 1
else:
self.metrics["failed_executions"] += 1
# Update average duration
duration = result.end_time - result.start_time
self._update_average_duration(duration)
# Record node times
for node_id, node_result in result.context.node_results.items():
self._record_node_time(node_id, node_result.duration)
Error Handling
Retry Logic
Copy
class RetryExecutor(WorkflowExecutor):
async def execute_with_retry(self, node, node_context, workflow_context):
"""Execute node with retry logic"""
max_retries = self.config.max_retries
retry_delay = 1 # Start with 1 second
for attempt in range(max_retries):
try:
result = await node.execute(node_context, workflow_context)
if result.success:
return result
# Check if error is retryable
if self._is_retryable_error(result.error):
await asyncio.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
return result
except Exception as e:
if attempt == max_retries - 1:
return NodeResult(
success=False,
error=f"Failed after {max_retries} attempts: {e}"
)
await asyncio.sleep(retry_delay)
retry_delay *= 2
return NodeResult(
success=False,
error=f"Max retries exceeded"
)
Fallback Strategies
Copy
class FallbackExecutor(WorkflowExecutor):
async def execute_with_fallback(
self,
primary_node,
fallback_node,
node_context,
workflow_context
):
"""Execute with fallback option"""
# Try primary node
result = await primary_node.execute(node_context, workflow_context)
if result.success:
return result
# Log primary failure
logger.warning(f"Primary node failed: {result.error}")
# Try fallback
logger.info("Executing fallback node")
fallback_result = await fallback_node.execute(
node_context, workflow_context
)
return fallback_result
Execution Optimization
Caching
Copy
from functools import lru_cache
class CachingExecutor(WorkflowExecutor):
def __init__(self):
super().__init__()
self.cache = {}
async def execute(self, workflow, input_data, context=None):
# Generate cache key
cache_key = self._generate_cache_key(workflow.id, input_data)
# Check cache
if cache_key in self.cache:
logger.info(f"Cache hit for workflow {workflow.id}")
return self.cache[cache_key]
# Execute workflow
result = await super().execute(workflow, input_data, context)
# Cache result
if result.success:
self.cache[cache_key] = result
return result
def _generate_cache_key(self, workflow_id, input_data):
import hashlib
import json
data_str = json.dumps(input_data, sort_keys=True)
return f"{workflow_id}:{hashlib.md5(data_str.encode()).hexdigest()}"
Resource Pooling
Copy
class PooledExecutor(WorkflowExecutor):
def __init__(self, pool_size=10):
super().__init__()
self.semaphore = asyncio.Semaphore(pool_size)
async def execute_node(self, node, node_context, workflow_context):
"""Execute node with resource pooling"""
async with self.semaphore:
return await node.execute(node_context, workflow_context)
Execution Tracing
Detailed Tracing
Copy
class TracingExecutor(WorkflowExecutor):
async def execute(self, workflow, input_data, context=None):
trace = ExecutionTrace(workflow.id)
# Start trace
trace.start()
try:
for node in workflow.nodes:
# Create span for node
with trace.span(f"node_{node.node_id}") as span:
span.set_attribute("node_type", node.node_type)
span.set_attribute("input_size", len(str(input_data)))
result = await node.execute(node_context, context)
span.set_attribute("output_size", len(str(result.output)))
span.set_attribute("success", result.success)
return result
finally:
# End trace
trace.end()
await trace.export()
Execution Result
The execution result contains comprehensive information:Copy
@dataclass
class ExecutionResult:
success: bool
output: Optional[Dict[str, Any]] = None
error: Optional[str] = None
context: Optional[WorkflowContext] = None
execution_time: Optional[float] = None
node_execution_order: List[str] = field(default_factory=list)
metrics: Dict[str, Any] = field(default_factory=dict)
trace_id: Optional[str] = None
Best Practices
Use Appropriate Executor
Use Appropriate Executor
Choose the right executor for your workflow pattern (sequential, parallel, conditional).
Handle Timeouts
Handle Timeouts
Always set reasonable timeouts to prevent hanging workflows.
Copy
config = ExecutionConfig(timeout_seconds=60)
executor = WorkflowExecutor(config)
Monitor Execution
Monitor Execution
Use callbacks and metrics to monitor workflow health.
Enable Checkpointing
Enable Checkpointing
For long-running workflows, enable checkpointing for resilience.
Copy
config = ExecutionConfig(
enable_checkpoints=True,
checkpoint_interval=10
)
Clean Up Resources
Clean Up Resources
Always clean up resources in finally blocks.
Copy
try:
result = await executor.execute(workflow, input_data)
finally:
await executor.cleanup()