Overview
Learn how to build streaming chat applications that provide real-time responses using Flow Core’s streaming capabilities. This enables a better user experience with immediate feedback.Basic Streaming Chat
Copy
from nadoo_flow import StreamingNode, LLMNode, ChainableNode
import asyncio
from typing import AsyncGenerator
class StreamingLLMNode(StreamingNode):
"""LLM node with streaming support"""
def __init__(self, model="gpt-3.5-turbo", temperature=0.7):
super().__init__()
self.model = model
self.temperature = temperature
async def stream_execute(self, data) -> AsyncGenerator:
"""Stream tokens as they're generated"""
message = data.get("message", "")
# Simulate streaming response
response_parts = [
"Hello! ",
"I'm ",
"happy ",
"to ",
"help ",
"you ",
"today. ",
"What ",
"can ",
"I ",
"assist ",
"you ",
"with?"
]
for part in response_parts:
await asyncio.sleep(0.1) # Simulate token generation delay
yield {
"token": part,
"finished": part == response_parts[-1]
}
# Create streaming chat workflow
streaming_chat = StreamingLLMNode()
# Stream responses
async def chat_stream(message: str):
async for token_data in streaming_chat.stream_execute({"message": message}):
print(token_data["token"], end="", flush=True)
if token_data["finished"]:
print() # New line at end
# Run streaming chat
await chat_stream("Hello, how are you?")
Advanced Streaming Chat with Memory
Copy
from nadoo_flow import StreamingNode, MemoryNode, ChainableNode
from typing import List, Dict, AsyncGenerator
import json
class ChatMemory(MemoryNode):
"""Maintain chat history"""
def __init__(self, max_history=10):
super().__init__()
self.history: List[Dict] = []
self.max_history = max_history
async def add_message(self, role: str, content: str):
"""Add message to history"""
self.history.append({
"role": role,
"content": content,
"timestamp": "2024-01-01T12:00:00Z"
})
# Keep only recent messages
if len(self.history) > self.max_history * 2:
self.history = self.history[-self.max_history * 2:]
async def get_context(self) -> List[Dict]:
"""Get chat context for LLM"""
return self.history
async def execute(self, data):
"""Process with memory context"""
# Add user message to history
await self.add_message("user", data["message"])
# Return context for next node
return {
"message": data["message"],
"history": await self.get_context()
}
class StreamingChatNode(StreamingNode):
"""Streaming chat with context awareness"""
def __init__(self, model="gpt-4", system_prompt=None):
super().__init__()
self.model = model
self.system_prompt = system_prompt or "You are a helpful assistant."
async def stream_execute(self, data) -> AsyncGenerator:
"""Generate streaming response with context"""
history = data.get("history", [])
current_message = data.get("message", "")
# Build context from history
context = self._build_context(history, current_message)
# Simulate streaming with context awareness
if "previous" in current_message.lower():
response = "Based on our previous discussion, "
else:
response = "I understand you're asking about "
response += f'"{current_message}". Let me help you with that.'
# Stream response token by token
tokens = response.split()
for i, token in enumerate(tokens):
await asyncio.sleep(0.05)
yield {
"token": token + " ",
"index": i,
"total": len(tokens),
"finished": i == len(tokens) - 1
}
def _build_context(self, history: List[Dict], current: str) -> str:
"""Build context string from history"""
context_parts = []
for msg in history[-5:]: # Last 5 messages
context_parts.append(f"{msg['role']}: {msg['content']}")
context_parts.append(f"user: {current}")
return "\n".join(context_parts)
# Create chat with memory
memory = ChatMemory(max_history=20)
streaming_chat = StreamingChatNode(
model="gpt-4",
system_prompt="You are a knowledgeable AI assistant."
)
# Chat workflow with memory
chat_workflow = memory | streaming_chat
# Interactive chat session
async def chat_session():
while True:
user_input = input("\nYou: ")
if user_input.lower() in ['quit', 'exit']:
break
print("Assistant: ", end="", flush=True)
# Process with memory and stream response
data = {"message": user_input}
processed = await memory.execute(data)
# Stream response
full_response = ""
async for token_data in streaming_chat.stream_execute(processed):
print(token_data["token"], end="", flush=True)
full_response += token_data["token"]
# Save assistant response to memory
await memory.add_message("assistant", full_response.strip())
print() # New line after response
Multi-Modal Streaming Chat
Copy
class MultiModalStreamingNode(StreamingNode):
"""Handle text, images, and audio in streaming chat"""
def __init__(self):
super().__init__()
self.processors = {
"text": self._process_text,
"image": self._process_image,
"audio": self._process_audio
}
async def stream_execute(self, data) -> AsyncGenerator:
"""Stream multi-modal responses"""
input_type = data.get("type", "text")
content = data.get("content")
processor = self.processors.get(input_type, self._process_text)
async for chunk in processor(content):
yield chunk
async def _process_text(self, text: str) -> AsyncGenerator:
"""Process text input"""
response = f"Processing text: {text[:50]}..."
for char in response:
await asyncio.sleep(0.02)
yield {
"type": "text",
"content": char,
"finished": char == response[-1]
}
async def _process_image(self, image_url: str) -> AsyncGenerator:
"""Process image input"""
# Simulate image analysis
stages = [
"Downloading image...",
"Analyzing content...",
"Detecting objects...",
"Generating description..."
]
for stage in stages:
await asyncio.sleep(0.5)
yield {
"type": "status",
"content": stage,
"progress": (stages.index(stage) + 1) / len(stages)
}
# Stream final description
description = "The image shows a beautiful landscape with mountains."
for word in description.split():
await asyncio.sleep(0.1)
yield {
"type": "text",
"content": word + " ",
"finished": word == description.split()[-1]
}
async def _process_audio(self, audio_data: bytes) -> AsyncGenerator:
"""Process audio input"""
# Simulate transcription
yield {
"type": "status",
"content": "Transcribing audio...",
"progress": 0.5
}
await asyncio.sleep(1.0)
transcript = "This is the transcribed audio content."
for word in transcript.split():
await asyncio.sleep(0.1)
yield {
"type": "transcript",
"content": word + " ",
"finished": word == transcript.split()[-1]
}
# Multi-modal chat example
multi_modal_chat = MultiModalStreamingNode()
async def process_multi_modal(input_type: str, content):
print(f"\nProcessing {input_type}:")
async for chunk in multi_modal_chat.stream_execute({
"type": input_type,
"content": content
}):
if chunk["type"] == "text" or chunk["type"] == "transcript":
print(chunk["content"], end="", flush=True)
elif chunk["type"] == "status":
print(f"\n[{chunk['progress']*100:.0f}%] {chunk['content']}")
if chunk.get("finished"):
print()
# Test different modalities
await process_multi_modal("text", "Hello, how are you?")
await process_multi_modal("image", "https://example.com/image.jpg")
await process_multi_modal("audio", b"audio_data_here")
Streaming with Real-time Processing
Copy
class RealtimeProcessor(StreamingNode):
"""Process streaming data in real-time"""
def __init__(self, buffer_size=10):
super().__init__()
self.buffer = []
self.buffer_size = buffer_size
async def stream_execute(self, data) -> AsyncGenerator:
"""Process stream with buffering"""
stream = data.get("stream", [])
for item in stream:
self.buffer.append(item)
# Process when buffer is full
if len(self.buffer) >= self.buffer_size:
result = await self._process_buffer()
yield result
self.buffer = []
# Process remaining items
if self.buffer:
result = await self._process_buffer()
yield result
async def _process_buffer(self):
"""Process buffered items"""
await asyncio.sleep(0.1) # Simulate processing
return {
"processed": len(self.buffer),
"items": self.buffer.copy(),
"timestamp": "2024-01-01T12:00:00Z"
}
class StreamAggregator(ChainableNode):
"""Aggregate streaming results"""
def __init__(self):
super().__init__()
self.results = []
async def execute(self, data):
"""Aggregate streaming data"""
if isinstance(data, dict):
self.results.append(data)
return {
"total_processed": sum(r.get("processed", 0) for r in self.results),
"chunks": len(self.results),
"all_items": [
item for r in self.results
for item in r.get("items", [])
]
}
# Real-time streaming pipeline
realtime_pipeline = (
RealtimeProcessor(buffer_size=5)
| StreamAggregator()
)
# Generate streaming data
async def generate_stream(count=100):
"""Generate streaming data"""
for i in range(count):
yield {"id": i, "value": f"item_{i}"}
await asyncio.sleep(0.01)
# Process stream
async def process_stream():
stream_data = []
async for item in generate_stream(50):
stream_data.append(item)
# Process in batches
processor = RealtimeProcessor(buffer_size=10)
aggregator = StreamAggregator()
async for batch_result in processor.stream_execute({"stream": stream_data}):
print(f"Processed batch: {batch_result['processed']} items")
final_result = await aggregator.execute(batch_result)
print(f"Total processed: {final_result['total_processed']} items")
WebSocket Streaming Chat
Copy
class WebSocketChatNode(StreamingNode):
"""WebSocket-based streaming chat"""
def __init__(self, websocket_url=None):
super().__init__()
self.websocket_url = websocket_url
self.connection = None
async def connect(self):
"""Establish WebSocket connection"""
# In production, use actual WebSocket library
self.connection = {"status": "connected"}
return True
async def disconnect(self):
"""Close WebSocket connection"""
self.connection = None
async def stream_execute(self, data) -> AsyncGenerator:
"""Stream over WebSocket"""
if not self.connection:
await self.connect()
message = data.get("message", "")
# Send message (simulated)
await self._send_message(message)
# Receive streaming response
async for chunk in self._receive_stream():
yield chunk
async def _send_message(self, message: str):
"""Send message over WebSocket"""
await asyncio.sleep(0.1)
print(f"Sent: {message}")
async def _receive_stream(self) -> AsyncGenerator:
"""Receive streaming response"""
# Simulate receiving streaming data
response_chunks = [
{"type": "start", "session_id": "abc123"},
{"type": "token", "content": "I "},
{"type": "token", "content": "understand "},
{"type": "token", "content": "your "},
{"type": "token", "content": "question. "},
{"type": "end", "total_tokens": 4}
]
for chunk in response_chunks:
await asyncio.sleep(0.05)
yield chunk
# WebSocket chat workflow
class WebSocketChatWorkflow:
def __init__(self):
self.chat_node = WebSocketChatNode("wss://chat.example.com")
self.session_active = False
async def start_session(self):
"""Start chat session"""
await self.chat_node.connect()
self.session_active = True
print("Chat session started")
async def send_message(self, message: str):
"""Send message and stream response"""
if not self.session_active:
await self.start_session()
print(f"\nYou: {message}")
print("Assistant: ", end="", flush=True)
async for chunk in self.chat_node.stream_execute({"message": message}):
if chunk["type"] == "token":
print(chunk["content"], end="", flush=True)
elif chunk["type"] == "end":
print(f"\n(Tokens used: {chunk['total_tokens']})")
async def end_session(self):
"""End chat session"""
await self.chat_node.disconnect()
self.session_active = False
print("Chat session ended")
# Use WebSocket chat
ws_chat = WebSocketChatWorkflow()
await ws_chat.send_message("What is Flow Core?")
await ws_chat.send_message("Tell me more about streaming")
await ws_chat.end_session()
Performance Optimization
Token Batching
Copy
class BatchedStreamingNode(StreamingNode):
"""Batch tokens for efficient streaming"""
def __init__(self, batch_size=5):
super().__init__()
self.batch_size = batch_size
async def stream_execute(self, data) -> AsyncGenerator:
"""Stream with token batching"""
response = "This is a long response that will be streamed in batches for better performance."
tokens = response.split()
batch = []
for i, token in enumerate(tokens):
batch.append(token)
if len(batch) >= self.batch_size or i == len(tokens) - 1:
await asyncio.sleep(0.1)
yield {
"tokens": batch,
"batch_index": i // self.batch_size,
"finished": i == len(tokens) - 1
}
batch = []
# Batched streaming
batched_stream = BatchedStreamingNode(batch_size=3)
async for batch_data in batched_stream.stream_execute({"message": "test"}):
print(" ".join(batch_data["tokens"]), end=" ", flush=True)
Stream Caching
Copy
class CachedStreamingNode(StreamingNode):
"""Cache streaming responses"""
def __init__(self):
super().__init__()
self.cache = {}
async def stream_execute(self, data) -> AsyncGenerator:
"""Stream with caching"""
cache_key = self._get_cache_key(data)
# Check cache
if cache_key in self.cache:
# Stream from cache
for chunk in self.cache[cache_key]:
yield chunk
await asyncio.sleep(0.01) # Simulate streaming delay
else:
# Generate and cache
chunks = []
async for chunk in self._generate_response(data):
chunks.append(chunk)
yield chunk
self.cache[cache_key] = chunks
def _get_cache_key(self, data):
"""Generate cache key"""
return hash(json.dumps(data, sort_keys=True))
async def _generate_response(self, data) -> AsyncGenerator:
"""Generate new response"""
response = f"Response to: {data.get('message', '')}"
for char in response:
await asyncio.sleep(0.02)
yield {"content": char}
Error Handling in Streaming
Copy
class RobustStreamingNode(StreamingNode):
"""Streaming with error handling"""
async def stream_execute(self, data) -> AsyncGenerator:
"""Stream with error recovery"""
try:
async for chunk in self._unsafe_stream(data):
yield chunk
except Exception as e:
# Yield error chunk
yield {
"type": "error",
"error": str(e),
"recoverable": True
}
# Try fallback response
async for chunk in self._fallback_stream(data):
yield chunk
async def _unsafe_stream(self, data) -> AsyncGenerator:
"""Potentially failing stream"""
for i in range(10):
if i == 5:
raise Exception("Stream interrupted")
await asyncio.sleep(0.1)
yield {"content": f"chunk_{i}"}
async def _fallback_stream(self, data) -> AsyncGenerator:
"""Fallback stream"""
yield {"content": "\n[Recovered] Using fallback response"}
Best Practices
Streaming Performance
Streaming Performance
- Use appropriate buffer sizes
- Implement token batching for efficiency
- Cache frequently requested responses
- Monitor memory usage for long streams
User Experience
User Experience
- Provide immediate feedback
- Show progress indicators
- Handle connection interruptions gracefully
- Implement retry logic for failures
Resource Management
Resource Management
- Close connections properly
- Implement timeouts for streaming
- Limit concurrent streams
- Clean up resources on errors