Skip to main content

Overview

Flow Core’s callback system provides comprehensive event hooks for monitoring, debugging, and controlling workflow execution. Callbacks enable real-time tracking of workflow progress, error handling, and custom business logic injection.

Callback Event System

CallbackEvent

Standardized event data structure passed to all callback handlers:
from nadoo_flow import CallbackEvent

@dataclass
class CallbackEvent:
    event_type: str              # Event type (node_start, workflow_end, etc.)
    workflow_id: str             # Workflow execution ID
    node_id: str | None         # Node ID (if node event)
    node_type: str | None       # Node type/class name
    timestamp: float            # Unix timestamp
    data: dict[str, Any]        # Event-specific data
    run_id: str                 # Unique execution ID
    tags: list[str]            # Event tags for filtering
    metadata: dict[str, Any]    # Additional metadata

Event Types

Complete list of available callback events:
CategoryEventDescription
Workflowon_workflow_startWorkflow execution begins
on_workflow_endWorkflow completes successfully
on_workflow_errorWorkflow encounters error
Nodeon_node_startNode begins execution
on_node_endNode completes successfully
on_node_errorNode encounters error
LLMon_llm_startLLM call begins
on_llm_endLLM call completes
on_llm_tokenIndividual token streamed
on_llm_errorLLM call fails
Toolon_tool_startTool execution begins
on_tool_endTool execution completes
on_tool_errorTool execution fails
Customon_custom_eventUser-defined events

BaseCallbackHandler

Create custom callback handlers by extending the base class:
from nadoo_flow import BaseCallbackHandler, CallbackEvent

class CustomCallbackHandler(BaseCallbackHandler):
    """Custom callback handler implementation"""

    def on_workflow_start(self, event: CallbackEvent):
        """Called when workflow starts"""
        print(f"🚀 Workflow {event.workflow_id} started")
        print(f"Input: {event.data.get('inputs')}")

    def on_node_start(self, event: CallbackEvent):
        """Called when node starts"""
        print(f"▶ Node {event.node_id} ({event.node_type}) starting")

    def on_node_end(self, event: CallbackEvent):
        """Called when node completes"""
        duration = event.data.get('execution_time', 0)
        print(f"✓ Node {event.node_id} completed in {duration:.2f}s")

    def on_llm_token(self, event: CallbackEvent):
        """Called for each LLM token"""
        token = event.data.get('token', '')
        print(token, end='', flush=True)

    def on_workflow_error(self, event: CallbackEvent):
        """Called on workflow error"""
        error = event.data.get('error', 'Unknown error')
        print(f"❌ Workflow failed: {error}")

    def on_custom_event(self, event: CallbackEvent):
        """Handle custom events"""
        if event.event_type == "progress_update":
            progress = event.data.get('progress', 0)
            print(f"Progress: {progress}%")

CallbackManager

Manage multiple callback handlers with the CallbackManager:
from nadoo_flow import CallbackManager

# Create manager with handlers
manager = CallbackManager(
    handlers=[
        ConsoleCallbackHandler(),
        LoggingCallbackHandler(),
        CustomCallbackHandler()
    ],
    inheritable_handlers=[],  # Handlers that propagate to sub-workflows
    tags=["production"],      # Tags for all events
    metadata={"env": "prod"}  # Metadata for all events
)

# Add handlers dynamically
manager.add_handler(MetricsHandler())
manager.add_handler(AlertHandler(), inheritable=True)

# Remove handlers
manager.remove_handler(handler_instance)

# Emit events
manager.on_workflow_start(workflow_context, inputs={"query": "test"})
manager.on_node_end(node_context, workflow_context, result)

Handler Inheritance

Inheritable handlers propagate to sub-workflows:
# Parent workflow handlers
parent_manager = CallbackManager(
    handlers=[LocalHandler()],
    inheritable_handlers=[GlobalMetricsHandler()]
)

# Child workflow inherits GlobalMetricsHandler
child_manager = parent_manager.copy(inherit_only=True)

Built-in Handlers

ConsoleCallbackHandler

Pretty-print events to console with colors and emojis:
from nadoo_flow import ConsoleCallbackHandler

handler = ConsoleCallbackHandler(
    verbose=True,  # Show detailed information
    colors=True    # Use ANSI colors
)

# Output examples:
# 🚀 Workflow started: workflow_123
# ▶ Starting node: llm_node
# 💭 LLM: Generating response...
# ✅ Node completed: llm_node (2.3s)
# 🎉 Workflow completed successfully

LoggingCallbackHandler

Integration with Python’s logging system:
from nadoo_flow import LoggingCallbackHandler
import logging

# Setup logger
logging.basicConfig(level=logging.INFO)

handler = LoggingCallbackHandler(
    logger_name="nadoo_flow",
    level=logging.INFO
)

# Logs events like:
# INFO:nadoo_flow:Workflow started: workflow_123
# INFO:nadoo_flow:Node llm_node started
# DEBUG:nadoo_flow:LLM token: "Hello"
# INFO:nadoo_flow:Node llm_node completed in 2.3s

MetricsCallbackHandler

Track execution metrics (example implementation):
class MetricsCallbackHandler(BaseCallbackHandler):
    def __init__(self):
        self.metrics = {
            'workflow_count': 0,
            'node_count': 0,
            'error_count': 0,
            'total_duration': 0
        }

    def on_workflow_start(self, event: CallbackEvent):
        self.metrics['workflow_count'] += 1

    def on_node_end(self, event: CallbackEvent):
        self.metrics['node_count'] += 1
        duration = event.data.get('execution_time', 0)
        self.metrics['total_duration'] += duration

    def on_workflow_error(self, event: CallbackEvent):
        self.metrics['error_count'] += 1

    def get_metrics(self) -> dict:
        return self.metrics.copy()

Integration Patterns

Pattern 1: Workflow Monitoring

Complete monitoring solution:
class MonitoringHandler(BaseCallbackHandler):
    def __init__(self, monitoring_service):
        self.service = monitoring_service
        self.start_times = {}

    def on_workflow_start(self, event: CallbackEvent):
        self.start_times[event.workflow_id] = event.timestamp
        self.service.record_start(event.workflow_id)

    def on_workflow_end(self, event: CallbackEvent):
        duration = event.timestamp - self.start_times.get(event.workflow_id, 0)
        self.service.record_success(
            workflow_id=event.workflow_id,
            duration=duration,
            output_size=len(str(event.data.get('output', '')))
        )

    def on_workflow_error(self, event: CallbackEvent):
        self.service.record_failure(
            workflow_id=event.workflow_id,
            error=event.data.get('error'),
            stack_trace=event.data.get('stack_trace')
        )

# Usage
workflow_context.callback_manager = CallbackManager(
    handlers=[MonitoringHandler(monitoring_service)]
)

Pattern 2: Progress Tracking

Track and report workflow progress:
class ProgressHandler(BaseCallbackHandler):
    def __init__(self, progress_callback):
        self.progress_callback = progress_callback
        self.total_nodes = 0
        self.completed_nodes = 0

    def on_workflow_start(self, event: CallbackEvent):
        # Count total nodes
        workflow = event.data.get('workflow')
        self.total_nodes = len(workflow.nodes) if workflow else 0
        self.completed_nodes = 0
        self._update_progress()

    def on_node_end(self, event: CallbackEvent):
        self.completed_nodes += 1
        self._update_progress()

    def _update_progress(self):
        if self.total_nodes > 0:
            progress = (self.completed_nodes / self.total_nodes) * 100
            self.progress_callback(progress)

# Usage with WebSocket
async def send_progress(progress):
    await websocket.send_json({"progress": progress})

handler = ProgressHandler(send_progress)

Pattern 3: Debugging & Tracing

Detailed execution tracing:
class DebugHandler(BaseCallbackHandler):
    def __init__(self, debug_file="debug.log"):
        self.file = open(debug_file, "a")
        self.indent_level = 0

    def _log(self, message: str):
        indent = "  " * self.indent_level
        timestamp = datetime.now().isoformat()
        self.file.write(f"[{timestamp}] {indent}{message}\n")
        self.file.flush()

    def on_workflow_start(self, event: CallbackEvent):
        self._log(f"WORKFLOW START: {event.workflow_id}")
        self._log(f"Inputs: {json.dumps(event.data.get('inputs'))}")
        self.indent_level += 1

    def on_node_start(self, event: CallbackEvent):
        self._log(f"NODE START: {event.node_id} ({event.node_type})")
        self._log(f"Input: {json.dumps(event.data.get('input'))}")
        self.indent_level += 1

    def on_node_end(self, event: CallbackEvent):
        self.indent_level -= 1
        self._log(f"NODE END: {event.node_id}")
        self._log(f"Output: {json.dumps(event.data.get('output'))}")

    def on_workflow_end(self, event: CallbackEvent):
        self.indent_level -= 1
        self._log(f"WORKFLOW END: Success")

Pattern 4: Cost Tracking

Track LLM token usage and costs:
class CostTracker(BaseCallbackHandler):
    COSTS_PER_1K = {
        "gpt-4": {"input": 0.03, "output": 0.06},
        "gpt-3.5-turbo": {"input": 0.001, "output": 0.002},
        "claude-3": {"input": 0.015, "output": 0.075}
    }

    def __init__(self):
        self.token_usage = defaultdict(lambda: {"input": 0, "output": 0})
        self.total_cost = 0

    def on_llm_end(self, event: CallbackEvent):
        model = event.data.get("model", "gpt-3.5-turbo")
        usage = event.data.get("usage", {})

        input_tokens = usage.get("prompt_tokens", 0)
        output_tokens = usage.get("completion_tokens", 0)

        self.token_usage[model]["input"] += input_tokens
        self.token_usage[model]["output"] += output_tokens

        # Calculate cost
        if model in self.COSTS_PER_1K:
            cost = (
                (input_tokens / 1000) * self.COSTS_PER_1K[model]["input"] +
                (output_tokens / 1000) * self.COSTS_PER_1K[model]["output"]
            )
            self.total_cost += cost

    def get_report(self):
        return {
            "token_usage": dict(self.token_usage),
            "total_cost": round(self.total_cost, 4)
        }

Advanced Features

Async Callbacks

Support for async callback handlers:
class AsyncCallbackHandler(BaseCallbackHandler):
    async def on_node_end_async(self, event: CallbackEvent):
        """Async callback method"""
        # Perform async operations
        await self.send_to_analytics(event)
        await self.update_database(event)

    async def send_to_analytics(self, event):
        async with httpx.AsyncClient() as client:
            await client.post(
                "https://analytics.api/events",
                json=event.to_dict()
            )

Conditional Callbacks

Execute callbacks based on conditions:
class ConditionalHandler(BaseCallbackHandler):
    def __init__(self, condition_fn):
        self.condition_fn = condition_fn

    def on_node_end(self, event: CallbackEvent):
        if self.condition_fn(event):
            # Only execute if condition is met
            self._handle_event(event)

# Usage
handler = ConditionalHandler(
    condition_fn=lambda e: e.node_type == "LLMNode"
)

Event Filtering

Filter events by tags or metadata:
class FilteredHandler(BaseCallbackHandler):
    def __init__(self, required_tags=None, required_metadata=None):
        self.required_tags = required_tags or []
        self.required_metadata = required_metadata or {}

    def _should_handle(self, event: CallbackEvent) -> bool:
        # Check tags
        if self.required_tags:
            if not all(tag in event.tags for tag in self.required_tags):
                return False

        # Check metadata
        for key, value in self.required_metadata.items():
            if event.metadata.get(key) != value:
                return False

        return True

    def on_node_end(self, event: CallbackEvent):
        if self._should_handle(event):
            # Process event
            pass

Best Practices

Callbacks run synchronously - avoid heavy operations:
# Bad - blocks execution
def on_node_end(self, event):
    time.sleep(5)  # Don't do this!

# Good - use async or queue
def on_node_end(self, event):
    self.queue.put(event)  # Process later
Don’t let callback errors crash workflows:
def on_node_end(self, event):
    try:
        self.process_event(event)
    except Exception as e:
        logger.error(f"Callback error: {e}")
        # Don't re-raise
Choose the right handler for your needs:
  • Console: Development/debugging
  • Logging: Production monitoring
  • Custom: Business logic
Clean up resources properly:
class FileHandler(BaseCallbackHandler):
    def __init__(self):
        self.file = open("log.txt", "a")

    def __del__(self):
        self.file.close()

Complete Example

from nadoo_flow import (
    BaseCallbackHandler, CallbackManager, CallbackEvent,
    WorkflowContext, BaseNode, NodeResult
)
import time
import json

class ComprehensiveHandler(BaseCallbackHandler):
    """Production-ready callback handler"""

    def __init__(self, webhook_url=None):
        self.webhook_url = webhook_url
        self.execution_stack = []
        self.metrics = {
            'nodes_executed': 0,
            'errors': 0,
            'total_tokens': 0,
            'execution_time': 0
        }

    def on_workflow_start(self, event: CallbackEvent):
        print(f"🚀 Starting workflow: {event.workflow_id}")
        self.execution_stack = []
        self.start_time = time.time()

    def on_node_start(self, event: CallbackEvent):
        node_info = {
            'node_id': event.node_id,
            'node_type': event.node_type,
            'start_time': event.timestamp
        }
        self.execution_stack.append(node_info)
        print(f"  ▶ {event.node_id} starting...")

    def on_llm_token(self, event: CallbackEvent):
        token = event.data.get('token', '')
        print(token, end='', flush=True)
        self.metrics['total_tokens'] += 1

    def on_node_end(self, event: CallbackEvent):
        if self.execution_stack:
            node_info = self.execution_stack.pop()
            duration = event.timestamp - node_info['start_time']
            print(f"\n{event.node_id} completed ({duration:.2f}s)")

        self.metrics['nodes_executed'] += 1

        # Send webhook if configured
        if self.webhook_url:
            self._send_webhook(event)

    def on_node_error(self, event: CallbackEvent):
        error = event.data.get('error', 'Unknown error')
        print(f"  ❌ {event.node_id} failed: {error}")
        self.metrics['errors'] += 1

    def on_workflow_end(self, event: CallbackEvent):
        self.metrics['execution_time'] = time.time() - self.start_time
        print(f"🎉 Workflow completed!")
        print(f"📊 Metrics: {json.dumps(self.metrics, indent=2)}")

    def on_workflow_error(self, event: CallbackEvent):
        error = event.data.get('error')
        print(f"💥 Workflow failed: {error}")

    def _send_webhook(self, event: CallbackEvent):
        """Send event to webhook (async in production)"""
        try:
            # In production, use async or queue this
            pass
        except Exception as e:
            print(f"Webhook error: {e}")

# Usage example
async def run_workflow_with_callbacks():
    # Create callback manager
    callback_manager = CallbackManager(
        handlers=[
            ComprehensiveHandler(webhook_url="https://api.example.com/events"),
            LoggingCallbackHandler(logger_name="workflow.execution")
        ],
        tags=["production", "v1.0"],
        metadata={"user_id": "user_123", "session_id": "session_456"}
    )

    # Create workflow context with callbacks
    workflow_context = WorkflowContext(
        workflow_id="example_workflow",
        callback_manager=callback_manager
    )

    # Execute workflow
    workflow = create_workflow()
    result = await workflow.run(
        input_data={"query": "Hello, world!"},
        context=workflow_context
    )

    return result

# Run
asyncio.run(run_workflow_with_callbacks())

See Also