Skip to main content

Overview

Contexts in Flow Core manage state at two levels:
  • NodeContext: Individual node execution state
  • WorkflowContext: Shared state across the entire workflow

NodeContext

Each node has its own isolated context for execution:
from nadoo_flow import NodeContext, NodeStatus

class NodeContext:
    """Execution context for individual nodes"""

    def __init__(self, node_id: str):
        self.node_id = node_id
        self.status = NodeStatus.PENDING
        self.input_data: Dict[str, Any] = {}
        self.output_data: Dict[str, Any] = {}
        self.error: Optional[str] = None
        self.start_time: Optional[datetime] = None
        self.end_time: Optional[datetime] = None
        self.metadata: Dict[str, Any] = {}
        self.variables: Dict[str, Any] = {}

NodeContext Properties

PropertyTypeDescription
node_idstrUnique identifier for the node
statusNodeStatusCurrent execution status
input_dataDictInput passed to the node
output_dataDictOutput produced by the node
errorOptional[str]Error message if failed
start_timedatetimeExecution start timestamp
end_timedatetimeExecution end timestamp
metadataDictAdditional node metadata
variablesDictNode-specific variables

NodeStatus Enum

from enum import Enum

class NodeStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    SKIPPED = "skipped"
    CANCELLED = "cancelled"

WorkflowContext

Shared context accessible by all nodes in the workflow:
from nadoo_flow import WorkflowContext

class WorkflowContext:
    """Shared context for entire workflow"""

    def __init__(self, workflow_id: str):
        self.workflow_id = workflow_id
        self.global_variables: Dict[str, Any] = {}
        self.shared_state: Dict[str, Any] = {}
        self.execution_metadata: Dict[str, Any] = {}
        self.node_results: Dict[str, NodeResult] = {}
        self.start_time: datetime = datetime.now()
        self.config: Dict[str, Any] = {}

WorkflowContext Properties

PropertyTypeDescription
workflow_idstrUnique workflow identifier
global_variablesDictVariables accessible by all nodes
shared_stateDictMutable shared state
execution_metadataDictWorkflow execution metadata
node_resultsDictResults from all executed nodes
configDictWorkflow configuration

Using Contexts in Nodes

Accessing Input Data

class ProcessingNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Access input data
        user_id = node_context.input_data.get("user_id")
        data = node_context.input_data.get("data", {})

        # Process data
        result = await self.process_user_data(user_id, data)

        return NodeResult(
            success=True,
            output={"processed": result}
        )

Setting Output Data

class DataNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Process something
        result = {"status": "completed", "count": 42}

        # Set output explicitly
        node_context.output_data = result

        # Or return via NodeResult
        return NodeResult(
            success=True,
            output=result
        )

Using Node Variables

class StatefulNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Initialize counter if not exists
        if "counter" not in node_context.variables:
            node_context.variables["counter"] = 0

        # Increment counter
        node_context.variables["counter"] += 1

        # Use counter value
        count = node_context.variables["counter"]

        return NodeResult(
            success=True,
            output={"iteration": count}
        )

Workflow Context Patterns

Sharing Data Between Nodes

class ProducerNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Generate data
        data = await self.fetch_data()

        # Share with other nodes
        workflow_context.shared_state["fetched_data"] = data

        return NodeResult(success=True, output=data)

class ConsumerNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Access shared data
        data = workflow_context.shared_state.get("fetched_data")

        if not data:
            return NodeResult(
                success=False,
                error="No data available from producer"
            )

        # Process shared data
        result = await self.process(data)

        return NodeResult(success=True, output=result)

Global Variables

class ConfigurableNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Access global configuration
        api_key = workflow_context.global_variables.get("api_key")
        timeout = workflow_context.global_variables.get("timeout", 30)

        # Use configuration
        response = await self.call_api(api_key, timeout)

        return NodeResult(success=True, output=response)

# Setting global variables
workflow_context = WorkflowContext("workflow_1")
workflow_context.global_variables = {
    "api_key": "sk-...",
    "timeout": 60,
    "max_retries": 3
}

Accessing Previous Node Results

class AggregatorNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Access results from specific nodes
        node1_result = workflow_context.node_results.get("node_1")
        node2_result = workflow_context.node_results.get("node_2")

        if not node1_result or not node2_result:
            return NodeResult(
                success=False,
                error="Required node results not available"
            )

        # Aggregate results
        combined = {
            "data1": node1_result.output,
            "data2": node2_result.output,
            "combined_at": datetime.now()
        }

        return NodeResult(success=True, output=combined)

Context Lifecycle

1

Initialization

Contexts are created when workflow starts
workflow_context = WorkflowContext(workflow_id)
node_context = NodeContext(node_id)
2

Pre-execution

Node context is prepared with input data
node_context.input_data = previous_node.output
node_context.status = NodeStatus.RUNNING
node_context.start_time = datetime.now()
3

Execution

Node processes using both contexts
result = await node.execute(node_context, workflow_context)
4

Post-execution

Contexts are updated with results
node_context.output_data = result.output
node_context.status = NodeStatus.SUCCESS
node_context.end_time = datetime.now()
workflow_context.node_results[node_id] = result
5

Cleanup

Contexts can be persisted or cleaned up
await save_contexts(workflow_context, node_contexts)

Advanced Context Features

Context Persistence

class PersistentWorkflowContext(WorkflowContext):
    """Context that persists to storage"""

    async def save(self):
        """Save context to database"""
        await db.save_context(
            workflow_id=self.workflow_id,
            data={
                "global_variables": self.global_variables,
                "shared_state": self.shared_state,
                "node_results": self._serialize_results()
            }
        )

    async def load(self):
        """Load context from database"""
        data = await db.load_context(self.workflow_id)
        if data:
            self.global_variables = data["global_variables"]
            self.shared_state = data["shared_state"]
            self.node_results = self._deserialize_results(data["node_results"])

    @classmethod
    async def restore(cls, workflow_id: str):
        """Restore a saved context"""
        context = cls(workflow_id)
        await context.load()
        return context

Context Isolation

class IsolatedNode(BaseNode):
    """Node with isolated context"""

    async def execute(self, node_context, workflow_context):
        # Create isolated copy
        isolated_context = self._create_isolated_context(workflow_context)

        # Modifications don't affect original
        isolated_context.shared_state["temp"] = "isolated"

        # Process with isolation
        result = await self.process_isolated(isolated_context)

        return NodeResult(success=True, output=result)

    def _create_isolated_context(self, original):
        """Create deep copy of context"""
        import copy
        return copy.deepcopy(original)

Context Validation

class ValidatedContext(NodeContext):
    """Context with validation"""

    def set_input(self, data: Dict[str, Any]):
        """Validate input before setting"""
        if not isinstance(data, dict):
            raise ValueError("Input must be a dictionary")

        # Validate required fields
        required_fields = ["user_id", "action"]
        for field in required_fields:
            if field not in data:
                raise ValueError(f"Missing required field: {field}")

        self.input_data = data

    def get_output(self) -> Dict[str, Any]:
        """Validate output before returning"""
        if not self.output_data:
            raise ValueError("No output data available")

        return self.output_data

Context Helpers

Context Utilities

from nadoo_flow import context_utils

# Merge contexts
merged = context_utils.merge_contexts(context1, context2)

# Clone context
cloned = context_utils.clone_context(original_context)

# Clear context
context_utils.clear_context(node_context)

# Serialize/deserialize
serialized = context_utils.serialize_context(workflow_context)
restored = context_utils.deserialize_context(serialized)

Context Decorators

from functools import wraps

def with_context(default_values=None):
    """Decorator to inject default context values"""
    def decorator(func):
        @wraps(func)
        async def wrapper(self, node_context, workflow_context):
            # Set defaults
            if default_values:
                for key, value in default_values.items():
                    if key not in workflow_context.global_variables:
                        workflow_context.global_variables[key] = value

            # Call original function
            return await func(self, node_context, workflow_context)
        return wrapper
    return decorator

# Usage
class MyNode(BaseNode):
    @with_context(default_values={"timeout": 30, "retries": 3})
    async def execute(self, node_context, workflow_context):
        timeout = workflow_context.global_variables["timeout"]
        # ... use timeout

Memory Management

Context Size Limits

class LimitedContext(WorkflowContext):
    """Context with size limits"""

    MAX_STATE_SIZE = 1024 * 1024  # 1MB

    def set_state(self, key: str, value: Any):
        """Set state with size check"""
        import sys

        # Check size
        size = sys.getsizeof(value)
        if size > self.MAX_STATE_SIZE:
            raise ValueError(f"Value too large: {size} bytes")

        self.shared_state[key] = value

Context Cleanup

class CleanableNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        try:
            # Do work
            result = await self.process()

            # Clean up large data
            if "large_data" in workflow_context.shared_state:
                del workflow_context.shared_state["large_data"]

            return NodeResult(success=True, output=result)
        finally:
            # Always clean up node variables
            node_context.variables.clear()

Debugging with Contexts

Context Logging

import json
import logging

class LoggingNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Log context state
        logging.debug(f"Node {self.node_id} starting")
        logging.debug(f"Input: {json.dumps(node_context.input_data)}")
        logging.debug(f"Variables: {json.dumps(node_context.variables)}")

        try:
            result = await self.process()
            logging.debug(f"Output: {json.dumps(result)}")
            return NodeResult(success=True, output=result)
        except Exception as e:
            logging.error(f"Node {self.node_id} failed: {e}")
            logging.error(f"Context state: {node_context.__dict__}")
            raise

Context Inspection

def inspect_workflow_context(workflow_context):
    """Debug helper to inspect context"""
    print(f"Workflow ID: {workflow_context.workflow_id}")
    print(f"Global Variables: {workflow_context.global_variables}")
    print(f"Shared State Keys: {list(workflow_context.shared_state.keys())}")
    print(f"Executed Nodes: {list(workflow_context.node_results.keys())}")
    print(f"Execution Time: {datetime.now() - workflow_context.start_time}")

Best Practices

Only share data that multiple nodes need. Keep node-specific data in NodeContext.
When possible, use immutable data structures to avoid unexpected mutations.
from typing import FrozenSet
workflow_context.shared_state["tags"] = frozenset(["tag1", "tag2"])
Always clear sensitive data from contexts after use.
# Clear after use
del workflow_context.global_variables["api_key"]
del node_context.variables["password"]
Clearly document what context data each node expects and produces.
class DataNode(BaseNode):
    """
    Expects in workflow context:
    - global_variables["api_key"]: API key for service
    - shared_state["user_id"]: User identifier

    Produces in workflow context:
    - shared_state["user_data"]: Fetched user data
    """

Next Steps