Skip to main content

Overview

The base module contains the fundamental classes and types for building workflows in Flow Core.

Classes

WorkflowExecutor

Main execution engine for workflows.
from nadoo_flow import WorkflowExecutor

executor = WorkflowExecutor(config=None)

Parameters

NameTypeRequiredDescription
configExecutionConfigNoExecution configuration

Methods

execute
Execute a workflow asynchronously.
async def execute(
    workflow: Workflow,
    input_data: Dict[str, Any],
    context: Optional[WorkflowContext] = None
) -> ExecutionResult
Parameters:
  • workflow: Workflow to execute
  • input_data: Initial input data
  • context: Optional workflow context
Returns:
  • ExecutionResult: Execution result with output and metadata
Example:
result = await executor.execute(
    workflow=my_workflow,
    input_data={"user_id": 123}
)

NodeContext

Execution context for individual nodes.
from nadoo_flow import NodeContext

context = NodeContext(node_id="node_1")

Attributes

NameTypeDescription
node_idstrUnique node identifier
statusNodeStatusCurrent execution status
input_dataDict[str, Any]Node input data
output_dataDict[str, Any]Node output data
errorOptional[str]Error message if failed
start_timeOptional[datetime]Execution start time
end_timeOptional[datetime]Execution end time
metadataDict[str, Any]Additional metadata
variablesDict[str, Any]Node-specific variables

Methods

get_input
Get input data with optional key.
def get_input(key: Optional[str] = None) -> Any
set_output
Set output data.
def set_output(data: Dict[str, Any]) -> None
set_variable
Store a variable in node context.
def set_variable(key: str, value: Any) -> None

WorkflowContext

Shared context for entire workflow execution.
from nadoo_flow import WorkflowContext

context = WorkflowContext(workflow_id="workflow_1")

Attributes

NameTypeDescription
workflow_idstrUnique workflow identifier
global_variablesDict[str, Any]Global workflow variables
shared_stateDict[str, Any]Shared mutable state
execution_metadataDict[str, Any]Execution metadata
node_resultsDict[str, NodeResult]Results from executed nodes
start_timedatetimeWorkflow start time
configDict[str, Any]Workflow configuration

BaseNode

Abstract base class for all nodes.
from nadoo_flow import BaseNode

class MyNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Implementation
        pass

Abstract Methods

execute
Execute node logic.
@abstractmethod
async def execute(
    node_context: NodeContext,
    workflow_context: WorkflowContext
) -> NodeResult
validate
Validate node configuration.
@abstractmethod
async def validate() -> bool

Hook Methods

pre_execute
Called before execution.
async def pre_execute(
    node_context: NodeContext,
    workflow_context: WorkflowContext
) -> None
post_execute
Called after execution.
async def post_execute(
    node_context: NodeContext,
    workflow_context: WorkflowContext
) -> None

NodeResult

Result returned by node execution.
from nadoo_flow import NodeResult

result = NodeResult(
    success=True,
    output={"data": "value"},
    error=None
)

Attributes

NameTypeRequiredDescription
successboolYesExecution success status
outputDict[str, Any]NoOutput data
errorOptional[str]NoError message if failed
next_node_idOptional[str]NoNext node to execute
conditional_nextOptional[Dict[str, str]]NoConditional routing
metadataDict[str, Any]NoAdditional metadata

IStepNode

Protocol interface for all workflow nodes.
from nadoo_flow import IStepNode

class CustomNode(IStepNode):
    async def execute(self, node_context, workflow_context):
        pass

    async def validate(self):
        pass

Enums

NodeStatus

Node execution status.
from nadoo_flow import NodeStatus

class NodeStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    SKIPPED = "skipped"
    CANCELLED = "cancelled"

CommonNodeTypes

Predefined node type constants.
from nadoo_flow import CommonNodeTypes

class CommonNodeTypes:
    START = "start"
    END = "end"
    CONDITION = "condition"
    PARALLEL = "parallel"
    LOOP = "loop"
    AI_AGENT = "ai_agent"
    LLM = "llm"
    TOOL = "tool"
    DATABASE = "database"
    PYTHON = "python"
    CUSTOM = "custom"

Configuration Classes

ExecutionConfig

Configuration for workflow execution.
from nadoo_flow import ExecutionConfig

config = ExecutionConfig(
    max_parallel_nodes=10,
    timeout_seconds=300,
    retry_failed_nodes=True,
    max_retries=3,
    enable_checkpoints=True,
    checkpoint_interval=5,
    trace_execution=True
)

Parameters

NameTypeDefaultDescription
max_parallel_nodesint10Max concurrent nodes
timeout_secondsint300Global timeout
retry_failed_nodesboolFalseAuto-retry failures
max_retriesint3Max retry attempts
enable_checkpointsboolFalseEnable checkpointing
checkpoint_intervalint5Checkpoint frequency
trace_executionboolFalseEnable tracing

ExecutionResult

Result of workflow execution.
from nadoo_flow import ExecutionResult

result = ExecutionResult(
    success=True,
    output={"result": "data"},
    error=None,
    context=workflow_context,
    execution_time=1.23
)

Attributes

NameTypeDescription
successboolExecution success
outputOptional[Dict[str, Any]]Final output
errorOptional[str]Error message
contextOptional[WorkflowContext]Final context
execution_timeOptional[float]Total time in seconds
node_execution_orderList[str]Node execution sequence
metricsDict[str, Any]Execution metrics
trace_idOptional[str]Trace identifier

Exceptions

WorkflowExecutionError

Raised when workflow execution fails.
from nadoo_flow import WorkflowExecutionError

raise WorkflowExecutionError("Workflow failed", node_id="node_1")

NodeExecutionError

Raised when node execution fails.
from nadoo_flow import NodeExecutionError

raise NodeExecutionError("Node failed", node_id="node_1", error_details={})

ValidationError

Raised when validation fails.
from nadoo_flow import ValidationError

raise ValidationError("Invalid configuration", field="config.timeout")

Utility Functions

create_workflow

Create a workflow from configuration.
from nadoo_flow import create_workflow

workflow = create_workflow({
    "id": "workflow_1",
    "nodes": [...],
    "edges": [...]
})

validate_workflow

Validate workflow structure.
from nadoo_flow import validate_workflow

is_valid = validate_workflow(workflow)

Usage Examples

Basic Node Implementation

from nadoo_flow import BaseNode, NodeResult

class ProcessingNode(BaseNode):
    def __init__(self):
        super().__init__(
            node_id="processor",
            node_type=CommonNodeTypes.CUSTOM,
            name="Data Processor"
        )

    async def execute(self, node_context, workflow_context):
        # Access input
        data = node_context.input_data

        # Process data
        result = await self.process(data)

        # Return result
        return NodeResult(
            success=True,
            output={"processed": result}
        )

    async def validate(self):
        return True

    async def process(self, data):
        # Custom processing logic
        return data

Workflow Execution

from nadoo_flow import WorkflowExecutor, ExecutionConfig

# Configure executor
config = ExecutionConfig(
    timeout_seconds=60,
    retry_failed_nodes=True
)

executor = WorkflowExecutor(config)

# Execute workflow
result = await executor.execute(
    workflow=my_workflow,
    input_data={"user_id": 123}
)

if result.success:
    print(f"Output: {result.output}")
else:
    print(f"Error: {result.error}")

Context Management

from nadoo_flow import NodeContext, WorkflowContext

# Create contexts
workflow_context = WorkflowContext("workflow_1")
workflow_context.global_variables["api_key"] = "sk-..."

node_context = NodeContext("node_1")
node_context.input_data = {"message": "Hello"}

# Execute node with contexts
result = await node.execute(node_context, workflow_context)

# Access results
print(f"Node output: {node_context.output_data}")
print(f"Workflow state: {workflow_context.shared_state}")

See Also

  • Nodes - Understanding node concepts
  • Context - Working with contexts
  • Execution - Workflow execution details