Overview
Flow Core provides comprehensive streaming support for real-time event processing, enabling responsive AI applications with token-by-token streaming, progress tracking, and event filtering.Core Components
StreamingContext
Manages event streaming through an async queue:StreamEvent
Standardized event format for all streaming data:StreamEventType
Available event types:StreamingNode Mixin
Add streaming capabilities to any node:Key Methods
| Method | Description |
|---|---|
get_streaming_context(workflow_context) | Extract streaming context |
emit_event(stream_ctx, event_type, name, data) | Emit custom event |
emit_start(stream_ctx, node_context) | Emit node start event |
emit_end(stream_ctx, node_context, result) | Emit node end event |
emit_token(stream_ctx, token, node_id) | Emit LLM token |
emit_error(stream_ctx, node_context, error) | Emit error event |
Event Filtering
Filter events by type, node ID, or tags:Token Collection
Aggregate streaming tokens:Real-world Example
Streaming Chat Implementation
Event Handling Patterns
Pattern 1: Multiple Event Handlers
Pattern 2: Custom Event Emission
Pattern 3: Error Recovery with Streaming
Performance Considerations
Buffer Management
Event Batching
Best Practices
Always Check for Streaming Context
Always Check for Streaming Context
Not all workflows enable streaming. Always check if context exists:
Use Appropriate Buffer Sizes
Use Appropriate Buffer Sizes
- Small buffers (10-50): Real-time UI updates
- Medium buffers (100-500): Balanced performance
- Large buffers (1000+): Batch processing, logging
Filter Early
Filter Early
Apply filters at the stream source to reduce processing:
Handle Backpressure
Handle Backpressure
Monitor and handle slow consumers: