Skip to main content

Overview

The WorkflowExecutor is the core engine that orchestrates node execution, manages state, handles errors, and ensures workflows run efficiently and reliably.

WorkflowExecutor

The main execution engine for workflows:
from nadoo_flow import WorkflowExecutor, ExecutionConfig

class WorkflowExecutor:
    """Core workflow execution engine"""

    def __init__(self, config: Optional[ExecutionConfig] = None):
        self.config = config or ExecutionConfig()
        self.callback_manager = CallbackManager()
        self.execution_history = []

    async def execute(
        self,
        workflow: Workflow,
        input_data: Dict[str, Any],
        context: Optional[WorkflowContext] = None
    ) -> ExecutionResult:
        """Execute a workflow"""
        pass

Execution Configuration

Configure execution behavior:
from nadoo_flow import ExecutionConfig

config = ExecutionConfig(
    max_parallel_nodes=10,
    timeout_seconds=300,
    retry_failed_nodes=True,
    max_retries=3,
    enable_checkpoints=True,
    checkpoint_interval=5,
    trace_execution=True
)

executor = WorkflowExecutor(config)

Configuration Options

OptionTypeDefaultDescription
max_parallel_nodesint10Maximum concurrent node executions
timeout_secondsint300Global workflow timeout
retry_failed_nodesboolFalseAuto-retry failed nodes
max_retriesint3Maximum retry attempts
enable_checkpointsboolFalseEnable execution checkpoints
trace_executionboolFalseEnable detailed execution tracing

Execution Strategies

Sequential Execution

Nodes execute one after another:
class SequentialExecutor(WorkflowExecutor):
    async def execute(self, workflow, input_data, context=None):
        context = context or WorkflowContext(workflow.id)
        current_data = input_data

        for node in workflow.nodes:
            node_context = NodeContext(node.node_id)
            node_context.input_data = current_data

            # Execute node
            result = await node.execute(node_context, context)

            if not result.success:
                return ExecutionResult(
                    success=False,
                    error=f"Node {node.node_id} failed: {result.error}"
                )

            current_data = result.output
            context.node_results[node.node_id] = result

        return ExecutionResult(
            success=True,
            output=current_data,
            context=context
        )

Parallel Execution

Execute independent nodes concurrently:
class ParallelExecutor(WorkflowExecutor):
    async def execute(self, workflow, input_data, context=None):
        context = context or WorkflowContext(workflow.id)

        # Group nodes by dependencies
        execution_groups = self._group_by_dependencies(workflow.nodes)

        for group in execution_groups:
            # Execute nodes in group concurrently
            tasks = []
            for node in group:
                node_context = NodeContext(node.node_id)
                node_context.input_data = self._get_node_input(
                    node, context, input_data
                )
                tasks.append(node.execute(node_context, context))

            # Wait for all nodes in group
            results = await asyncio.gather(*tasks, return_exceptions=True)

            # Process results
            for node, result in zip(group, results):
                if isinstance(result, Exception):
                    return ExecutionResult(
                        success=False,
                        error=f"Node {node.node_id} failed: {result}"
                    )
                context.node_results[node.node_id] = result

        return ExecutionResult(
            success=True,
            output=self._get_final_output(context),
            context=context
        )

Conditional Execution

Execute nodes based on conditions:
class ConditionalExecutor(WorkflowExecutor):
    async def execute(self, workflow, input_data, context=None):
        context = context or WorkflowContext(workflow.id)
        current_node = workflow.start_node

        while current_node:
            node_context = NodeContext(current_node.node_id)
            node_context.input_data = input_data

            result = await current_node.execute(node_context, context)
            context.node_results[current_node.node_id] = result

            if not result.success:
                return ExecutionResult(success=False, error=result.error)

            # Determine next node based on conditions
            if result.conditional_next:
                condition = self._evaluate_condition(
                    result.output, result.conditional_next
                )
                current_node = workflow.get_node(condition)
            else:
                current_node = workflow.get_node(result.next_node_id)

        return ExecutionResult(
            success=True,
            output=context.node_results,
            context=context
        )

Execution Flow Control

Starting Execution

# Simple execution
executor = WorkflowExecutor()
result = await executor.execute(
    workflow=my_workflow,
    input_data={"user_id": 123}
)

# With custom context
context = WorkflowContext("workflow_1")
context.global_variables["api_key"] = "sk-..."

result = await executor.execute(
    workflow=my_workflow,
    input_data={"user_id": 123},
    context=context
)

Pausing and Resuming

class PausableExecutor(WorkflowExecutor):
    def __init__(self):
        super().__init__()
        self.paused = False
        self.pause_point = None

    async def execute(self, workflow, input_data, context=None):
        for i, node in enumerate(workflow.nodes):
            if self.paused and self.pause_point == i:
                # Save state for resume
                await self._save_checkpoint(context, i)
                return ExecutionResult(
                    success=True,
                    paused=True,
                    checkpoint=i
                )

            result = await node.execute(node_context, context)
            # ... continue execution

    async def resume(self, workflow, checkpoint_id):
        """Resume from checkpoint"""
        context, start_index = await self._load_checkpoint(checkpoint_id)
        # Continue from saved point
        return await self._execute_from(workflow, context, start_index)

Cancellation

class CancellableExecutor(WorkflowExecutor):
    def __init__(self):
        super().__init__()
        self.cancelled = False

    async def execute(self, workflow, input_data, context=None):
        for node in workflow.nodes:
            if self.cancelled:
                return ExecutionResult(
                    success=False,
                    cancelled=True,
                    partial_results=context.node_results
                )

            result = await node.execute(node_context, context)
            # ... continue

    def cancel(self):
        """Cancel ongoing execution"""
        self.cancelled = True

Execution Monitoring

Execution Callbacks

from nadoo_flow import BaseCallbackHandler

class ExecutionMonitor(BaseCallbackHandler):
    async def on_workflow_start(self, workflow_id, input_data):
        print(f"Workflow {workflow_id} started")

    async def on_node_start(self, node_id, input_data):
        print(f"Node {node_id} started")

    async def on_node_complete(self, node_id, output_data):
        print(f"Node {node_id} completed")

    async def on_workflow_complete(self, workflow_id, output_data):
        print(f"Workflow {workflow_id} completed")

    async def on_error(self, error_info):
        print(f"Error: {error_info}")

# Register callbacks
executor = WorkflowExecutor()
executor.callback_manager.add_handler(ExecutionMonitor())

Execution Metrics

class MetricsCollector:
    """Collect execution metrics"""

    def __init__(self):
        self.metrics = {
            "total_executions": 0,
            "successful_executions": 0,
            "failed_executions": 0,
            "average_duration": 0,
            "node_execution_times": {}
        }

    async def record_execution(self, result: ExecutionResult):
        self.metrics["total_executions"] += 1

        if result.success:
            self.metrics["successful_executions"] += 1
        else:
            self.metrics["failed_executions"] += 1

        # Update average duration
        duration = result.end_time - result.start_time
        self._update_average_duration(duration)

        # Record node times
        for node_id, node_result in result.context.node_results.items():
            self._record_node_time(node_id, node_result.duration)

Error Handling

Retry Logic

class RetryExecutor(WorkflowExecutor):
    async def execute_with_retry(self, node, node_context, workflow_context):
        """Execute node with retry logic"""
        max_retries = self.config.max_retries
        retry_delay = 1  # Start with 1 second

        for attempt in range(max_retries):
            try:
                result = await node.execute(node_context, workflow_context)

                if result.success:
                    return result

                # Check if error is retryable
                if self._is_retryable_error(result.error):
                    await asyncio.sleep(retry_delay)
                    retry_delay *= 2  # Exponential backoff
                else:
                    return result

            except Exception as e:
                if attempt == max_retries - 1:
                    return NodeResult(
                        success=False,
                        error=f"Failed after {max_retries} attempts: {e}"
                    )
                await asyncio.sleep(retry_delay)
                retry_delay *= 2

        return NodeResult(
            success=False,
            error=f"Max retries exceeded"
        )

Fallback Strategies

class FallbackExecutor(WorkflowExecutor):
    async def execute_with_fallback(
        self,
        primary_node,
        fallback_node,
        node_context,
        workflow_context
    ):
        """Execute with fallback option"""
        # Try primary node
        result = await primary_node.execute(node_context, workflow_context)

        if result.success:
            return result

        # Log primary failure
        logger.warning(f"Primary node failed: {result.error}")

        # Try fallback
        logger.info("Executing fallback node")
        fallback_result = await fallback_node.execute(
            node_context, workflow_context
        )

        return fallback_result

Execution Optimization

Caching

from functools import lru_cache

class CachingExecutor(WorkflowExecutor):
    def __init__(self):
        super().__init__()
        self.cache = {}

    async def execute(self, workflow, input_data, context=None):
        # Generate cache key
        cache_key = self._generate_cache_key(workflow.id, input_data)

        # Check cache
        if cache_key in self.cache:
            logger.info(f"Cache hit for workflow {workflow.id}")
            return self.cache[cache_key]

        # Execute workflow
        result = await super().execute(workflow, input_data, context)

        # Cache result
        if result.success:
            self.cache[cache_key] = result

        return result

    def _generate_cache_key(self, workflow_id, input_data):
        import hashlib
        import json

        data_str = json.dumps(input_data, sort_keys=True)
        return f"{workflow_id}:{hashlib.md5(data_str.encode()).hexdigest()}"

Resource Pooling

class PooledExecutor(WorkflowExecutor):
    def __init__(self, pool_size=10):
        super().__init__()
        self.semaphore = asyncio.Semaphore(pool_size)

    async def execute_node(self, node, node_context, workflow_context):
        """Execute node with resource pooling"""
        async with self.semaphore:
            return await node.execute(node_context, workflow_context)

Execution Tracing

Detailed Tracing

class TracingExecutor(WorkflowExecutor):
    async def execute(self, workflow, input_data, context=None):
        trace = ExecutionTrace(workflow.id)

        # Start trace
        trace.start()

        try:
            for node in workflow.nodes:
                # Create span for node
                with trace.span(f"node_{node.node_id}") as span:
                    span.set_attribute("node_type", node.node_type)
                    span.set_attribute("input_size", len(str(input_data)))

                    result = await node.execute(node_context, context)

                    span.set_attribute("output_size", len(str(result.output)))
                    span.set_attribute("success", result.success)

            return result

        finally:
            # End trace
            trace.end()
            await trace.export()

Execution Result

The execution result contains comprehensive information:
@dataclass
class ExecutionResult:
    success: bool
    output: Optional[Dict[str, Any]] = None
    error: Optional[str] = None
    context: Optional[WorkflowContext] = None
    execution_time: Optional[float] = None
    node_execution_order: List[str] = field(default_factory=list)
    metrics: Dict[str, Any] = field(default_factory=dict)
    trace_id: Optional[str] = None

Best Practices

Choose the right executor for your workflow pattern (sequential, parallel, conditional).
Always set reasonable timeouts to prevent hanging workflows.
config = ExecutionConfig(timeout_seconds=60)
executor = WorkflowExecutor(config)
Use callbacks and metrics to monitor workflow health.
For long-running workflows, enable checkpointing for resilience.
config = ExecutionConfig(
    enable_checkpoints=True,
    checkpoint_interval=10
)
Always clean up resources in finally blocks.
try:
    result = await executor.execute(workflow, input_data)
finally:
    await executor.cleanup()

Next Steps