Overview
Flow Core’s streaming capabilities enable real-time data processing and token-by-token output, perfect for chatbots, live updates, and progressive rendering.Streaming Basics
Stream Method
Every workflow supports streaming:Copy
from nadoo_flow import Workflow
workflow = create_workflow()
# Stream execution
async for event in workflow.stream(input_data):
print(f"Event: {event.type}")
print(f"Data: {event.data}")
Event Types
Copy
from nadoo_flow.streaming import StreamEvent, StreamEventType
class StreamEventType(Enum):
START = "start"
NODE_START = "node_start"
NODE_OUTPUT = "node_output"
NODE_COMPLETE = "node_complete"
TOKEN = "token"
ERROR = "error"
COMPLETE = "complete"
Creating Streaming Nodes
Basic Streaming Node
UseStreamingNode mixin to add streaming capabilities:
Copy
from nadoo_flow import BaseNode, StreamingNode, StreamEventType, NodeResult
import asyncio
class ChatStreamingNode(BaseNode, StreamingNode):
"""Node that streams tokens"""
def __init__(self):
BaseNode.__init__(
self,
node_id="chat_streaming",
node_type="chat",
name="ChatStreaming",
config={}
)
StreamingNode.__init__(self)
async def execute(self, node_context, workflow_context):
# Get streaming context
stream_ctx = self.get_streaming_context(workflow_context)
# Emit start event
await self.emit_start(stream_ctx, node_context)
# Stream tokens
response = "Hello, how can I help you today?"
for token in response.split():
await self.emit_token(stream_ctx, token + " ", self.node_id)
await asyncio.sleep(0.1) # Simulate typing
# Emit end event
result = NodeResult(success=True, output={"response": response})
await self.emit_end(stream_ctx, node_context, result)
return result
LLM Streaming
Copy
class LLMStreamingNode(BaseNode, StreamingNode):
"""Stream LLM responses token by token"""
def __init__(self, model_name: str = "gpt-4"):
BaseNode.__init__(
self,
node_id="llm_streaming",
node_type="llm",
name="LLMStreaming",
config={}
)
StreamingNode.__init__(self)
self.model_name = model_name
async def execute(self, node_context, workflow_context):
prompt = node_context.input_data.get("prompt")
stream_ctx = self.get_streaming_context(workflow_context)
# Emit LLM start event
await self.emit_event(
stream_ctx,
StreamEventType.LLM_START,
self.node_id,
data={"model": self.model_name, "prompt": prompt}
)
full_response = ""
# Simulate LLM streaming (replace with actual LLM API)
tokens = ["Hello", " ", "world", "!", " ", "How", " ", "can", " ", "I", " ", "help", "?"]
for token in tokens:
full_response += token
# Emit token event
await self.emit_token(stream_ctx, token, self.node_id)
await asyncio.sleep(0.05)
# Emit LLM end event
result = NodeResult(success=True, output={"response": full_response})
await self.emit_event(
stream_ctx,
StreamEventType.LLM_END,
self.node_id,
data={"response": full_response}
)
return result
StreamingNode Helper Methods
TheStreamingNode mixin provides useful helpers:
Copy
# Available helper methods:
await self.emit_start(stream_ctx, node_context) # Node start event
await self.emit_end(stream_ctx, node_context, result) # Node end event
await self.emit_error(stream_ctx, node_context, error) # Node error event
await self.emit_token(stream_ctx, token, node_id) # LLM token event
# Generic event emission:
await self.emit_event(
stream_ctx,
StreamEventType.CUSTOM,
"my_event",
data={"custom_field": "value"}
)
Using Streaming Context
Setting Up Streaming
Copy
from nadoo_flow import StreamingContext, WorkflowContext
import asyncio
async def run_with_streaming():
# Create workflow context
workflow_context = WorkflowContext()
# Create and attach streaming context
streaming_context = StreamingContext(buffer_size=100)
workflow_context.streaming_context = streaming_context
# Use as async context manager
async with streaming_context:
# Start streaming consumer
async def consume_events():
async for event in streaming_context.stream():
print(f"[{event.event_type.value}] {event.name}: {event.data}")
# Run consumer in background
consumer_task = asyncio.create_task(consume_events())
# Execute node
node = ChatStreamingNode()
result = await node.execute(node_context, workflow_context)
# Wait for all events to be consumed
await consumer_task
# Run
asyncio.run(run_with_streaming())
# Complete
yield StreamEvent(
type=StreamEventType.NODE_COMPLETE,
data={"response": full_response}
)
Stream Processing
Stream Handler
Copy
class StreamHandler:
"""Process stream events"""
async def handle_stream(self, workflow, input_data):
buffer = []
async for event in workflow.stream(input_data):
if event.type == StreamEventType.TOKEN:
# Handle token
buffer.append(event.data["token"])
await self.send_to_client(event.data["token"])
elif event.type == StreamEventType.NODE_COMPLETE:
# Handle completion
full_text = "".join(buffer)
await self.save_response(full_text)
buffer.clear()
elif event.type == StreamEventType.ERROR:
# Handle error
await self.handle_error(event.data["error"])
async def send_to_client(self, token):
"""Send token to client via WebSocket/SSE"""
pass
Progressive Output
Copy
class ProgressiveNode(StreamingNode):
"""Stream progress updates"""
async def stream_execute(self, node_context, workflow_context):
items = node_context.input_data.get("items", [])
total = len(items)
for i, item in enumerate(items):
# Process item
result = await self.process_item(item)
# Stream progress
yield StreamEvent(
type=StreamEventType.NODE_OUTPUT,
data={
"progress": (i + 1) / total * 100,
"current": i + 1,
"total": total,
"item_result": result
}
)
yield StreamEvent(
type=StreamEventType.NODE_COMPLETE,
data={"processed": total}
)
Stream Transformers
Token Accumulator
Copy
class TokenAccumulator:
"""Accumulate tokens into chunks"""
def __init__(self, chunk_size=10):
self.chunk_size = chunk_size
self.buffer = []
async def transform_stream(self, stream):
async for event in stream:
if event.type == StreamEventType.TOKEN:
self.buffer.append(event.data["token"])
if len(self.buffer) >= self.chunk_size:
yield StreamEvent(
type=StreamEventType.TOKEN,
data={
"chunk": "".join(self.buffer),
"tokens": self.buffer.copy()
}
)
self.buffer.clear()
else:
# Flush buffer
if self.buffer:
yield StreamEvent(
type=StreamEventType.TOKEN,
data={"chunk": "".join(self.buffer)}
)
self.buffer.clear()
yield event
Stream Filter
Copy
class StreamFilter:
"""Filter stream events"""
def __init__(self, event_types=None):
self.event_types = event_types or []
async def filter_stream(self, stream):
async for event in stream:
if not self.event_types or event.type in self.event_types:
yield event
# Usage
filter = StreamFilter([StreamEventType.TOKEN, StreamEventType.ERROR])
async for event in filter.filter_stream(workflow.stream(data)):
print(event)
Buffered Streaming
Buffering Strategy
Copy
class BufferedStreamingNode(StreamingNode):
"""Stream with buffering for performance"""
def __init__(self, buffer_size=100, flush_interval=1.0):
super().__init__()
self.buffer_size = buffer_size
self.flush_interval = flush_interval
async def stream_execute(self, node_context, workflow_context):
buffer = []
last_flush = time.time()
async for item in self.generate_items():
buffer.append(item)
# Check if should flush
should_flush = (
len(buffer) >= self.buffer_size or
(time.time() - last_flush) >= self.flush_interval
)
if should_flush:
yield StreamEvent(
type=StreamEventType.NODE_OUTPUT,
data={"batch": buffer.copy()}
)
buffer.clear()
last_flush = time.time()
# Flush remaining
if buffer:
yield StreamEvent(
type=StreamEventType.NODE_OUTPUT,
data={"batch": buffer}
)
WebSocket Integration
WebSocket Stream Handler
Copy
class WebSocketStreamHandler:
"""Handle streaming over WebSocket"""
def __init__(self, websocket):
self.websocket = websocket
async def stream_workflow(self, workflow, input_data):
try:
async for event in workflow.stream(input_data):
await self.websocket.send_json({
"type": event.type.value,
"data": event.data,
"timestamp": time.time()
})
except WebSocketDisconnect:
logger.info("WebSocket disconnected")
except Exception as e:
await self.websocket.send_json({
"type": "error",
"error": str(e)
})
# FastAPI example
@app.websocket("/stream")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
handler = WebSocketStreamHandler(websocket)
workflow = create_workflow()
input_data = await websocket.receive_json()
await handler.stream_workflow(workflow, input_data)
Server-Sent Events (SSE)
SSE Stream Handler
Copy
from fastapi import Response
from fastapi.responses import StreamingResponse
class SSEStreamHandler:
"""Handle streaming via Server-Sent Events"""
@staticmethod
async def stream_to_sse(workflow, input_data):
async for event in workflow.stream(input_data):
# Format as SSE
data = json.dumps({
"type": event.type.value,
"data": event.data
})
yield f"data: {data}\n\n"
# Send completion
yield "data: [DONE]\n\n"
# FastAPI endpoint
@app.post("/stream-sse")
async def stream_sse(request: dict):
workflow = create_workflow()
return StreamingResponse(
SSEStreamHandler.stream_to_sse(workflow, request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
}
)
Stream Callbacks
Custom Stream Callbacks
Copy
from nadoo_flow import StreamCallbackHandler
class CustomStreamCallback(StreamCallbackHandler):
"""Custom callback for stream events"""
async def on_stream_start(self, workflow_id):
print(f"Stream started: {workflow_id}")
async def on_token(self, token):
print(f"Token: {token}", end="", flush=True)
async def on_node_start(self, node_id):
print(f"\n[Node {node_id} started]")
async def on_node_complete(self, node_id, output):
print(f"\n[Node {node_id} completed]")
async def on_stream_complete(self, final_output):
print(f"\nStream completed: {final_output}")
async def on_error(self, error):
print(f"\nError: {error}")
# Register callback
workflow.add_stream_callback(CustomStreamCallback())
Stream Aggregation
Aggregating Multiple Streams
Copy
class StreamAggregator:
"""Aggregate multiple streams"""
async def aggregate_streams(self, *streams):
"""Merge multiple streams into one"""
tasks = []
for i, stream in enumerate(streams):
task = self._consume_stream(i, stream)
tasks.append(asyncio.create_task(task))
# Merge all events
while tasks:
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for task in done:
event = await task
if event:
yield event
# Continue consuming from this stream
stream_id = event.metadata["stream_id"]
new_task = self._consume_stream(
stream_id, streams[stream_id]
)
tasks.append(asyncio.create_task(new_task))
tasks = list(pending)
async def _consume_stream(self, stream_id, stream):
try:
event = await anext(stream)
event.metadata["stream_id"] = stream_id
return event
except StopAsyncIteration:
return None
Performance Optimization
Async Streaming
Copy
class OptimizedStreamingNode(StreamingNode):
"""Optimized streaming with async generators"""
async def stream_execute(self, node_context, workflow_context):
# Use async generator for efficiency
async with aiohttp.ClientSession() as session:
async with session.get(url, chunked=True) as response:
async for chunk in response.content.iter_chunked(1024):
# Process chunk
processed = await self.process_chunk(chunk)
yield StreamEvent(
type=StreamEventType.NODE_OUTPUT,
data={"chunk": processed}
)
Stream Caching
Copy
class CachedStreamingNode(StreamingNode):
"""Cache streamed data for replay"""
def __init__(self):
super().__init__()
self.cache = []
async def stream_execute(self, node_context, workflow_context):
cache_key = self._get_cache_key(node_context.input_data)
# Check cache
if cache_key in self.cache:
for event in self.cache[cache_key]:
yield event
return
# Stream and cache
events = []
async for event in self._generate_stream(node_context):
events.append(event)
yield event
# Store in cache
self.cache[cache_key] = events
Error Handling in Streams
Stream Error Recovery
Copy
class ResilientStreamingNode(StreamingNode):
"""Streaming with error recovery"""
async def stream_execute(self, node_context, workflow_context):
retry_count = 0
max_retries = 3
while retry_count < max_retries:
try:
async for event in self._stream_internal(node_context):
yield event
break # Success
except StreamError as e:
retry_count += 1
# Send error event
yield StreamEvent(
type=StreamEventType.ERROR,
data={
"error": str(e),
"retry": retry_count,
"recoverable": True
}
)
if retry_count >= max_retries:
yield StreamEvent(
type=StreamEventType.ERROR,
data={
"error": "Max retries exceeded",
"recoverable": False
}
)
break
await asyncio.sleep(2 ** retry_count) # Exponential backoff
Best Practices
Use Buffering
Use Buffering
Buffer small events to reduce overhead of frequent updates.
Handle Disconnections
Handle Disconnections
Implement reconnection logic for network interruptions.
Add Heartbeats
Add Heartbeats
Send periodic heartbeat events to detect stale connections.
Copy
async def heartbeat():
while streaming:
yield StreamEvent(type="heartbeat")
await asyncio.sleep(30)
Implement Backpressure
Implement Backpressure
Control flow when consumer is slower than producer.
Clean Up Resources
Clean Up Resources
Always clean up streams properly to avoid memory leaks.