Skip to main content

Overview

Flow Core’s streaming capabilities enable real-time data processing and token-by-token output, perfect for chatbots, live updates, and progressive rendering.

Streaming Basics

Stream Method

Every workflow supports streaming:
from nadoo_flow import Workflow

workflow = create_workflow()

# Stream execution
async for event in workflow.stream(input_data):
    print(f"Event: {event.type}")
    print(f"Data: {event.data}")

Event Types

from nadoo_flow.streaming import StreamEvent, StreamEventType

class StreamEventType(Enum):
    START = "start"
    NODE_START = "node_start"
    NODE_OUTPUT = "node_output"
    NODE_COMPLETE = "node_complete"
    TOKEN = "token"
    ERROR = "error"
    COMPLETE = "complete"

Creating Streaming Nodes

Basic Streaming Node

Use StreamingNode mixin to add streaming capabilities:
from nadoo_flow import BaseNode, StreamingNode, StreamEventType, NodeResult
import asyncio

class ChatStreamingNode(BaseNode, StreamingNode):
    """Node that streams tokens"""

    def __init__(self):
        BaseNode.__init__(
            self,
            node_id="chat_streaming",
            node_type="chat",
            name="ChatStreaming",
            config={}
        )
        StreamingNode.__init__(self)

    async def execute(self, node_context, workflow_context):
        # Get streaming context
        stream_ctx = self.get_streaming_context(workflow_context)

        # Emit start event
        await self.emit_start(stream_ctx, node_context)

        # Stream tokens
        response = "Hello, how can I help you today?"
        for token in response.split():
            await self.emit_token(stream_ctx, token + " ", self.node_id)
            await asyncio.sleep(0.1)  # Simulate typing

        # Emit end event
        result = NodeResult(success=True, output={"response": response})
        await self.emit_end(stream_ctx, node_context, result)

        return result

LLM Streaming

class LLMStreamingNode(BaseNode, StreamingNode):
    """Stream LLM responses token by token"""

    def __init__(self, model_name: str = "gpt-4"):
        BaseNode.__init__(
            self,
            node_id="llm_streaming",
            node_type="llm",
            name="LLMStreaming",
            config={}
        )
        StreamingNode.__init__(self)
        self.model_name = model_name

    async def execute(self, node_context, workflow_context):
        prompt = node_context.input_data.get("prompt")
        stream_ctx = self.get_streaming_context(workflow_context)

        # Emit LLM start event
        await self.emit_event(
            stream_ctx,
            StreamEventType.LLM_START,
            self.node_id,
            data={"model": self.model_name, "prompt": prompt}
        )

        full_response = ""
        # Simulate LLM streaming (replace with actual LLM API)
        tokens = ["Hello", " ", "world", "!", " ", "How", " ", "can", " ", "I", " ", "help", "?"]
        for token in tokens:
            full_response += token

            # Emit token event
            await self.emit_token(stream_ctx, token, self.node_id)
            await asyncio.sleep(0.05)

        # Emit LLM end event
        result = NodeResult(success=True, output={"response": full_response})
        await self.emit_event(
            stream_ctx,
            StreamEventType.LLM_END,
            self.node_id,
            data={"response": full_response}
        )

        return result

StreamingNode Helper Methods

The StreamingNode mixin provides useful helpers:
# Available helper methods:
await self.emit_start(stream_ctx, node_context)          # Node start event
await self.emit_end(stream_ctx, node_context, result)    # Node end event
await self.emit_error(stream_ctx, node_context, error)   # Node error event
await self.emit_token(stream_ctx, token, node_id)        # LLM token event

# Generic event emission:
await self.emit_event(
    stream_ctx,
    StreamEventType.CUSTOM,
    "my_event",
    data={"custom_field": "value"}
)

Using Streaming Context

Setting Up Streaming

from nadoo_flow import StreamingContext, WorkflowContext
import asyncio

async def run_with_streaming():
    # Create workflow context
    workflow_context = WorkflowContext()

    # Create and attach streaming context
    streaming_context = StreamingContext(buffer_size=100)
    workflow_context.streaming_context = streaming_context

    # Use as async context manager
    async with streaming_context:
        # Start streaming consumer
        async def consume_events():
            async for event in streaming_context.stream():
                print(f"[{event.event_type.value}] {event.name}: {event.data}")

        # Run consumer in background
        consumer_task = asyncio.create_task(consume_events())

        # Execute node
        node = ChatStreamingNode()
        result = await node.execute(node_context, workflow_context)

        # Wait for all events to be consumed
        await consumer_task

# Run
asyncio.run(run_with_streaming())

        # Complete
        yield StreamEvent(
            type=StreamEventType.NODE_COMPLETE,
            data={"response": full_response}
        )

Stream Processing

Stream Handler

class StreamHandler:
    """Process stream events"""

    async def handle_stream(self, workflow, input_data):
        buffer = []

        async for event in workflow.stream(input_data):
            if event.type == StreamEventType.TOKEN:
                # Handle token
                buffer.append(event.data["token"])
                await self.send_to_client(event.data["token"])

            elif event.type == StreamEventType.NODE_COMPLETE:
                # Handle completion
                full_text = "".join(buffer)
                await self.save_response(full_text)
                buffer.clear()

            elif event.type == StreamEventType.ERROR:
                # Handle error
                await self.handle_error(event.data["error"])

    async def send_to_client(self, token):
        """Send token to client via WebSocket/SSE"""
        pass

Progressive Output

class ProgressiveNode(StreamingNode):
    """Stream progress updates"""

    async def stream_execute(self, node_context, workflow_context):
        items = node_context.input_data.get("items", [])
        total = len(items)

        for i, item in enumerate(items):
            # Process item
            result = await self.process_item(item)

            # Stream progress
            yield StreamEvent(
                type=StreamEventType.NODE_OUTPUT,
                data={
                    "progress": (i + 1) / total * 100,
                    "current": i + 1,
                    "total": total,
                    "item_result": result
                }
            )

        yield StreamEvent(
            type=StreamEventType.NODE_COMPLETE,
            data={"processed": total}
        )

Stream Transformers

Token Accumulator

class TokenAccumulator:
    """Accumulate tokens into chunks"""

    def __init__(self, chunk_size=10):
        self.chunk_size = chunk_size
        self.buffer = []

    async def transform_stream(self, stream):
        async for event in stream:
            if event.type == StreamEventType.TOKEN:
                self.buffer.append(event.data["token"])

                if len(self.buffer) >= self.chunk_size:
                    yield StreamEvent(
                        type=StreamEventType.TOKEN,
                        data={
                            "chunk": "".join(self.buffer),
                            "tokens": self.buffer.copy()
                        }
                    )
                    self.buffer.clear()
            else:
                # Flush buffer
                if self.buffer:
                    yield StreamEvent(
                        type=StreamEventType.TOKEN,
                        data={"chunk": "".join(self.buffer)}
                    )
                    self.buffer.clear()

                yield event

Stream Filter

class StreamFilter:
    """Filter stream events"""

    def __init__(self, event_types=None):
        self.event_types = event_types or []

    async def filter_stream(self, stream):
        async for event in stream:
            if not self.event_types or event.type in self.event_types:
                yield event

# Usage
filter = StreamFilter([StreamEventType.TOKEN, StreamEventType.ERROR])
async for event in filter.filter_stream(workflow.stream(data)):
    print(event)

Buffered Streaming

Buffering Strategy

class BufferedStreamingNode(StreamingNode):
    """Stream with buffering for performance"""

    def __init__(self, buffer_size=100, flush_interval=1.0):
        super().__init__()
        self.buffer_size = buffer_size
        self.flush_interval = flush_interval

    async def stream_execute(self, node_context, workflow_context):
        buffer = []
        last_flush = time.time()

        async for item in self.generate_items():
            buffer.append(item)

            # Check if should flush
            should_flush = (
                len(buffer) >= self.buffer_size or
                (time.time() - last_flush) >= self.flush_interval
            )

            if should_flush:
                yield StreamEvent(
                    type=StreamEventType.NODE_OUTPUT,
                    data={"batch": buffer.copy()}
                )
                buffer.clear()
                last_flush = time.time()

        # Flush remaining
        if buffer:
            yield StreamEvent(
                type=StreamEventType.NODE_OUTPUT,
                data={"batch": buffer}
            )

WebSocket Integration

WebSocket Stream Handler

class WebSocketStreamHandler:
    """Handle streaming over WebSocket"""

    def __init__(self, websocket):
        self.websocket = websocket

    async def stream_workflow(self, workflow, input_data):
        try:
            async for event in workflow.stream(input_data):
                await self.websocket.send_json({
                    "type": event.type.value,
                    "data": event.data,
                    "timestamp": time.time()
                })

        except WebSocketDisconnect:
            logger.info("WebSocket disconnected")
        except Exception as e:
            await self.websocket.send_json({
                "type": "error",
                "error": str(e)
            })

# FastAPI example
@app.websocket("/stream")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    handler = WebSocketStreamHandler(websocket)

    workflow = create_workflow()
    input_data = await websocket.receive_json()

    await handler.stream_workflow(workflow, input_data)

Server-Sent Events (SSE)

SSE Stream Handler

from fastapi import Response
from fastapi.responses import StreamingResponse

class SSEStreamHandler:
    """Handle streaming via Server-Sent Events"""

    @staticmethod
    async def stream_to_sse(workflow, input_data):
        async for event in workflow.stream(input_data):
            # Format as SSE
            data = json.dumps({
                "type": event.type.value,
                "data": event.data
            })
            yield f"data: {data}\n\n"

        # Send completion
        yield "data: [DONE]\n\n"

# FastAPI endpoint
@app.post("/stream-sse")
async def stream_sse(request: dict):
    workflow = create_workflow()

    return StreamingResponse(
        SSEStreamHandler.stream_to_sse(workflow, request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        }
    )

Stream Callbacks

Custom Stream Callbacks

from nadoo_flow import StreamCallbackHandler

class CustomStreamCallback(StreamCallbackHandler):
    """Custom callback for stream events"""

    async def on_stream_start(self, workflow_id):
        print(f"Stream started: {workflow_id}")

    async def on_token(self, token):
        print(f"Token: {token}", end="", flush=True)

    async def on_node_start(self, node_id):
        print(f"\n[Node {node_id} started]")

    async def on_node_complete(self, node_id, output):
        print(f"\n[Node {node_id} completed]")

    async def on_stream_complete(self, final_output):
        print(f"\nStream completed: {final_output}")

    async def on_error(self, error):
        print(f"\nError: {error}")

# Register callback
workflow.add_stream_callback(CustomStreamCallback())

Stream Aggregation

Aggregating Multiple Streams

class StreamAggregator:
    """Aggregate multiple streams"""

    async def aggregate_streams(self, *streams):
        """Merge multiple streams into one"""
        tasks = []
        for i, stream in enumerate(streams):
            task = self._consume_stream(i, stream)
            tasks.append(asyncio.create_task(task))

        # Merge all events
        while tasks:
            done, pending = await asyncio.wait(
                tasks, return_when=asyncio.FIRST_COMPLETED
            )

            for task in done:
                event = await task
                if event:
                    yield event
                    # Continue consuming from this stream
                    stream_id = event.metadata["stream_id"]
                    new_task = self._consume_stream(
                        stream_id, streams[stream_id]
                    )
                    tasks.append(asyncio.create_task(new_task))

            tasks = list(pending)

    async def _consume_stream(self, stream_id, stream):
        try:
            event = await anext(stream)
            event.metadata["stream_id"] = stream_id
            return event
        except StopAsyncIteration:
            return None

Performance Optimization

Async Streaming

class OptimizedStreamingNode(StreamingNode):
    """Optimized streaming with async generators"""

    async def stream_execute(self, node_context, workflow_context):
        # Use async generator for efficiency
        async with aiohttp.ClientSession() as session:
            async with session.get(url, chunked=True) as response:
                async for chunk in response.content.iter_chunked(1024):
                    # Process chunk
                    processed = await self.process_chunk(chunk)

                    yield StreamEvent(
                        type=StreamEventType.NODE_OUTPUT,
                        data={"chunk": processed}
                    )

Stream Caching

class CachedStreamingNode(StreamingNode):
    """Cache streamed data for replay"""

    def __init__(self):
        super().__init__()
        self.cache = []

    async def stream_execute(self, node_context, workflow_context):
        cache_key = self._get_cache_key(node_context.input_data)

        # Check cache
        if cache_key in self.cache:
            for event in self.cache[cache_key]:
                yield event
            return

        # Stream and cache
        events = []
        async for event in self._generate_stream(node_context):
            events.append(event)
            yield event

        # Store in cache
        self.cache[cache_key] = events

Error Handling in Streams

Stream Error Recovery

class ResilientStreamingNode(StreamingNode):
    """Streaming with error recovery"""

    async def stream_execute(self, node_context, workflow_context):
        retry_count = 0
        max_retries = 3

        while retry_count < max_retries:
            try:
                async for event in self._stream_internal(node_context):
                    yield event
                break  # Success

            except StreamError as e:
                retry_count += 1

                # Send error event
                yield StreamEvent(
                    type=StreamEventType.ERROR,
                    data={
                        "error": str(e),
                        "retry": retry_count,
                        "recoverable": True
                    }
                )

                if retry_count >= max_retries:
                    yield StreamEvent(
                        type=StreamEventType.ERROR,
                        data={
                            "error": "Max retries exceeded",
                            "recoverable": False
                        }
                    )
                    break

                await asyncio.sleep(2 ** retry_count)  # Exponential backoff

Best Practices

Buffer small events to reduce overhead of frequent updates.
Implement reconnection logic for network interruptions.
Send periodic heartbeat events to detect stale connections.
async def heartbeat():
    while streaming:
        yield StreamEvent(type="heartbeat")
        await asyncio.sleep(30)
Control flow when consumer is slower than producer.
Always clean up streams properly to avoid memory leaks.

Next Steps