Skip to main content

Overview

Flow Core provides comprehensive streaming support for real-time event processing, enabling responsive AI applications with token-by-token streaming, progress tracking, and event filtering.

Core Components

StreamingContext

Manages event streaming through an async queue:
from nadoo_flow import StreamingContext

# Create streaming context
async with StreamingContext(buffer_size=100) as streaming_context:
    # Attach to workflow
    workflow_context.streaming_context = streaming_context

    # Process stream in parallel
    async def handle_stream():
        async for event in streaming_context.stream():
            print(f"Event: {event.event_type} - {event.name}")

    # Run workflow with streaming
    stream_task = asyncio.create_task(handle_stream())
    result = await workflow.run(input_data)
    await stream_task

StreamEvent

Standardized event format for all streaming data:
from nadoo_flow import StreamEvent, StreamEventType

@dataclass
class StreamEvent:
    event_type: StreamEventType  # Event type enum
    name: str                     # Event/node name
    timestamp: float              # When event occurred
    data: dict                    # Event-specific data
    run_id: str | None           # Execution ID
    tags: list[str]              # Event tags
    metadata: dict               # Additional metadata

StreamEventType

Available event types:
class StreamEventType(Enum):
    # Workflow events
    WORKFLOW_START = "workflow_start"
    WORKFLOW_END = "workflow_end"
    WORKFLOW_ERROR = "workflow_error"

    # Node events
    NODE_START = "node_start"
    NODE_END = "node_end"
    NODE_ERROR = "node_error"

    # LLM events
    LLM_START = "llm_start"
    LLM_END = "llm_end"
    LLM_TOKEN = "llm_token"  # Individual token
    LLM_ERROR = "llm_error"

    # Tool events
    TOOL_START = "tool_start"
    TOOL_END = "tool_end"
    TOOL_ERROR = "tool_error"

    # Parser events
    PARSER_START = "parser_start"
    PARSER_END = "parser_end"
    PARSER_ERROR = "parser_error"

    # Custom events
    CUSTOM = "custom"

StreamingNode Mixin

Add streaming capabilities to any node:
from nadoo_flow import BaseNode, StreamingNode

class MyStreamingNode(BaseNode, StreamingNode):
    def __init__(self):
        BaseNode.__init__(self, "my_node")
        StreamingNode.__init__(self)

    async def execute(self, node_context, workflow_context):
        # Get streaming context
        stream_ctx = self.get_streaming_context(workflow_context)

        if stream_ctx:
            # Emit start event
            await self.emit_start(stream_ctx, node_context)

            # Stream tokens
            for token in self.generate_tokens():
                await self.emit_token(stream_ctx, token, self.node_id)

            # Emit end event
            await self.emit_end(stream_ctx, node_context, result)

        return NodeResult(success=True, output={"text": full_text})

Key Methods

MethodDescription
get_streaming_context(workflow_context)Extract streaming context
emit_event(stream_ctx, event_type, name, data)Emit custom event
emit_start(stream_ctx, node_context)Emit node start event
emit_end(stream_ctx, node_context, result)Emit node end event
emit_token(stream_ctx, token, node_id)Emit LLM token
emit_error(stream_ctx, node_context, error)Emit error event

Event Filtering

Filter events by type, node ID, or tags:
from nadoo_flow import StreamEventFilter

# Filter by event type
token_filter = StreamEventFilter(
    event_types=[StreamEventType.LLM_TOKEN]
)

# Filter by node ID
node_filter = StreamEventFilter(
    node_ids=["llm_node", "chat_node"]
)

# Filter by tags
important_filter = StreamEventFilter(
    tags=["important", "user_facing"]
)

# Apply filter
async for event in token_filter.filter(stream):
    print(event.data["token"], end="", flush=True)

Token Collection

Aggregate streaming tokens:
from nadoo_flow import TokenCollector

collector = TokenCollector()

async for event in stream:
    if event.event_type == StreamEventType.LLM_TOKEN:
        collector.add_token(event.data["token"])

# Get aggregated text
full_text = collector.get_text()
tokens_count = collector.get_count()

Real-world Example

Streaming Chat Implementation

from nadoo_flow import (
    BaseNode, StreamingNode, WorkflowContext,
    StreamingContext, StreamEventFilter, StreamEventType
)

class StreamingChatNode(BaseNode, StreamingNode):
    """Chat node with token streaming"""

    def __init__(self, model="gpt-3.5-turbo"):
        BaseNode.__init__(self, "streaming_chat")
        StreamingNode.__init__(self)
        self.model = model

    async def execute(self, node_context, workflow_context):
        stream_ctx = self.get_streaming_context(workflow_context)
        user_input = node_context.input_data["message"]

        if stream_ctx:
            await self.emit_start(stream_ctx, node_context)

        # Simulate streaming response
        response = "Hello! How can I help you today?"
        full_response = ""

        for token in response.split():
            if stream_ctx:
                await self.emit_token(stream_ctx, token + " ", self.node_id)
            full_response += token + " "
            await asyncio.sleep(0.1)  # Simulate delay

        if stream_ctx:
            await self.emit_end(stream_ctx, node_context, full_response)

        return NodeResult(
            success=True,
            output={"response": full_response.strip()}
        )

# Usage
async def main():
    chat_node = StreamingChatNode()

    async with StreamingContext() as stream_ctx:
        workflow_context = WorkflowContext(
            workflow_id="chat_workflow",
            streaming_context=stream_ctx
        )

        # Display tokens as they arrive
        async def display_tokens():
            token_filter = StreamEventFilter(
                event_types=[StreamEventType.LLM_TOKEN]
            )
            async for event in token_filter.filter(stream_ctx.stream()):
                print(event.data["token"], end="", flush=True)

        # Run with streaming
        display_task = asyncio.create_task(display_tokens())
        result = await chat_node.execute(
            NodeContext(node_id="chat", input_data={"message": "Hi"}),
            workflow_context
        )
        await display_task

Event Handling Patterns

Pattern 1: Multiple Event Handlers

async def handle_tokens(stream):
    """Display streaming tokens"""
    filter = StreamEventFilter(event_types=[StreamEventType.LLM_TOKEN])
    async for event in filter.filter(stream):
        print(event.data["token"], end="", flush=True)

async def handle_progress(stream):
    """Show progress updates"""
    filter = StreamEventFilter(event_types=[
        StreamEventType.NODE_START,
        StreamEventType.NODE_END
    ])
    async for event in filter.filter(stream):
        if event.event_type == StreamEventType.NODE_START:
            print(f"\n▶ Starting {event.name}...")
        else:
            print(f"✓ Completed {event.name}")

# Run multiple handlers
async with StreamingContext() as ctx:
    tasks = [
        asyncio.create_task(handle_tokens(ctx.stream())),
        asyncio.create_task(handle_progress(ctx.stream()))
    ]
    result = await workflow.run(input_data)
    await asyncio.gather(*tasks)

Pattern 2: Custom Event Emission

class DataProcessorNode(BaseNode, StreamingNode):
    async def execute(self, node_context, workflow_context):
        stream_ctx = self.get_streaming_context(workflow_context)

        for i, item in enumerate(data_items):
            # Emit progress event
            if stream_ctx:
                await self.emit_event(
                    stream_ctx,
                    event_type=StreamEventType.CUSTOM,
                    name="progress",
                    data={
                        "current": i + 1,
                        "total": len(data_items),
                        "percentage": (i + 1) / len(data_items) * 100
                    }
                )

            # Process item
            result = await self.process_item(item)

Pattern 3: Error Recovery with Streaming

class ResilientStreamingNode(BaseNode, StreamingNode):
    async def execute(self, node_context, workflow_context):
        stream_ctx = self.get_streaming_context(workflow_context)

        try:
            # Normal execution with streaming
            result = await self.process_with_streaming(stream_ctx)
            return NodeResult(success=True, output=result)

        except Exception as e:
            # Emit error event
            if stream_ctx:
                await self.emit_error(stream_ctx, node_context, str(e))

            # Fallback logic
            return NodeResult(
                success=False,
                error=str(e),
                output={"fallback": "Using cached response"}
            )

Performance Considerations

Buffer Management

# Small buffer for low latency
StreamingContext(buffer_size=10)  # Quick response, may drop events

# Large buffer for reliability
StreamingContext(buffer_size=1000)  # Higher latency, no event loss

Event Batching

class BatchedEventEmitter(StreamingNode):
    def __init__(self, batch_size=10):
        super().__init__()
        self.batch_size = batch_size
        self.event_buffer = []

    async def emit_batched_event(self, stream_ctx, event_data):
        self.event_buffer.append(event_data)

        if len(self.event_buffer) >= self.batch_size:
            await self.emit_event(
                stream_ctx,
                StreamEventType.CUSTOM,
                "batch",
                {"events": self.event_buffer}
            )
            self.event_buffer = []

Best Practices

Not all workflows enable streaming. Always check if context exists:
stream_ctx = self.get_streaming_context(workflow_context)
if stream_ctx:
    await self.emit_event(...)
  • Small buffers (10-50): Real-time UI updates
  • Medium buffers (100-500): Balanced performance
  • Large buffers (1000+): Batch processing, logging
Apply filters at the stream source to reduce processing:
# Good - filter at source
async for event in filter.filter(stream):
    process(event)

# Bad - filter after receiving
async for event in stream:
    if event.event_type == target_type:
        process(event)
Monitor and handle slow consumers:
if stream_ctx.queue.qsize() > threshold:
    # Skip non-critical events or batch
    pass

See Also