Skip to main content

Overview

Learn how to build streaming chat applications that provide real-time responses using Flow Core’s streaming capabilities. This enables a better user experience with immediate feedback.

Basic Streaming Chat

from nadoo_flow import StreamingNode, LLMNode, ChainableNode
import asyncio
from typing import AsyncGenerator

class StreamingLLMNode(StreamingNode):
    """LLM node with streaming support"""

    def __init__(self, model="gpt-3.5-turbo", temperature=0.7):
        super().__init__()
        self.model = model
        self.temperature = temperature

    async def stream_execute(self, data) -> AsyncGenerator:
        """Stream tokens as they're generated"""
        message = data.get("message", "")

        # Simulate streaming response
        response_parts = [
            "Hello! ",
            "I'm ",
            "happy ",
            "to ",
            "help ",
            "you ",
            "today. ",
            "What ",
            "can ",
            "I ",
            "assist ",
            "you ",
            "with?"
        ]

        for part in response_parts:
            await asyncio.sleep(0.1)  # Simulate token generation delay
            yield {
                "token": part,
                "finished": part == response_parts[-1]
            }

# Create streaming chat workflow
streaming_chat = StreamingLLMNode()

# Stream responses
async def chat_stream(message: str):
    async for token_data in streaming_chat.stream_execute({"message": message}):
        print(token_data["token"], end="", flush=True)
        if token_data["finished"]:
            print()  # New line at end

# Run streaming chat
await chat_stream("Hello, how are you?")

Advanced Streaming Chat with Memory

from nadoo_flow import StreamingNode, MemoryNode, ChainableNode
from typing import List, Dict, AsyncGenerator
import json

class ChatMemory(MemoryNode):
    """Maintain chat history"""

    def __init__(self, max_history=10):
        super().__init__()
        self.history: List[Dict] = []
        self.max_history = max_history

    async def add_message(self, role: str, content: str):
        """Add message to history"""
        self.history.append({
            "role": role,
            "content": content,
            "timestamp": "2024-01-01T12:00:00Z"
        })

        # Keep only recent messages
        if len(self.history) > self.max_history * 2:
            self.history = self.history[-self.max_history * 2:]

    async def get_context(self) -> List[Dict]:
        """Get chat context for LLM"""
        return self.history

    async def execute(self, data):
        """Process with memory context"""
        # Add user message to history
        await self.add_message("user", data["message"])

        # Return context for next node
        return {
            "message": data["message"],
            "history": await self.get_context()
        }

class StreamingChatNode(StreamingNode):
    """Streaming chat with context awareness"""

    def __init__(self, model="gpt-4", system_prompt=None):
        super().__init__()
        self.model = model
        self.system_prompt = system_prompt or "You are a helpful assistant."

    async def stream_execute(self, data) -> AsyncGenerator:
        """Generate streaming response with context"""
        history = data.get("history", [])
        current_message = data.get("message", "")

        # Build context from history
        context = self._build_context(history, current_message)

        # Simulate streaming with context awareness
        if "previous" in current_message.lower():
            response = "Based on our previous discussion, "
        else:
            response = "I understand you're asking about "

        response += f'"{current_message}". Let me help you with that.'

        # Stream response token by token
        tokens = response.split()
        for i, token in enumerate(tokens):
            await asyncio.sleep(0.05)
            yield {
                "token": token + " ",
                "index": i,
                "total": len(tokens),
                "finished": i == len(tokens) - 1
            }

    def _build_context(self, history: List[Dict], current: str) -> str:
        """Build context string from history"""
        context_parts = []
        for msg in history[-5:]:  # Last 5 messages
            context_parts.append(f"{msg['role']}: {msg['content']}")
        context_parts.append(f"user: {current}")
        return "\n".join(context_parts)

# Create chat with memory
memory = ChatMemory(max_history=20)
streaming_chat = StreamingChatNode(
    model="gpt-4",
    system_prompt="You are a knowledgeable AI assistant."
)

# Chat workflow with memory
chat_workflow = memory | streaming_chat

# Interactive chat session
async def chat_session():
    while True:
        user_input = input("\nYou: ")
        if user_input.lower() in ['quit', 'exit']:
            break

        print("Assistant: ", end="", flush=True)

        # Process with memory and stream response
        data = {"message": user_input}
        processed = await memory.execute(data)

        # Stream response
        full_response = ""
        async for token_data in streaming_chat.stream_execute(processed):
            print(token_data["token"], end="", flush=True)
            full_response += token_data["token"]

        # Save assistant response to memory
        await memory.add_message("assistant", full_response.strip())
        print()  # New line after response

Multi-Modal Streaming Chat

class MultiModalStreamingNode(StreamingNode):
    """Handle text, images, and audio in streaming chat"""

    def __init__(self):
        super().__init__()
        self.processors = {
            "text": self._process_text,
            "image": self._process_image,
            "audio": self._process_audio
        }

    async def stream_execute(self, data) -> AsyncGenerator:
        """Stream multi-modal responses"""
        input_type = data.get("type", "text")
        content = data.get("content")

        processor = self.processors.get(input_type, self._process_text)
        async for chunk in processor(content):
            yield chunk

    async def _process_text(self, text: str) -> AsyncGenerator:
        """Process text input"""
        response = f"Processing text: {text[:50]}..."
        for char in response:
            await asyncio.sleep(0.02)
            yield {
                "type": "text",
                "content": char,
                "finished": char == response[-1]
            }

    async def _process_image(self, image_url: str) -> AsyncGenerator:
        """Process image input"""
        # Simulate image analysis
        stages = [
            "Downloading image...",
            "Analyzing content...",
            "Detecting objects...",
            "Generating description..."
        ]

        for stage in stages:
            await asyncio.sleep(0.5)
            yield {
                "type": "status",
                "content": stage,
                "progress": (stages.index(stage) + 1) / len(stages)
            }

        # Stream final description
        description = "The image shows a beautiful landscape with mountains."
        for word in description.split():
            await asyncio.sleep(0.1)
            yield {
                "type": "text",
                "content": word + " ",
                "finished": word == description.split()[-1]
            }

    async def _process_audio(self, audio_data: bytes) -> AsyncGenerator:
        """Process audio input"""
        # Simulate transcription
        yield {
            "type": "status",
            "content": "Transcribing audio...",
            "progress": 0.5
        }

        await asyncio.sleep(1.0)

        transcript = "This is the transcribed audio content."
        for word in transcript.split():
            await asyncio.sleep(0.1)
            yield {
                "type": "transcript",
                "content": word + " ",
                "finished": word == transcript.split()[-1]
            }

# Multi-modal chat example
multi_modal_chat = MultiModalStreamingNode()

async def process_multi_modal(input_type: str, content):
    print(f"\nProcessing {input_type}:")
    async for chunk in multi_modal_chat.stream_execute({
        "type": input_type,
        "content": content
    }):
        if chunk["type"] == "text" or chunk["type"] == "transcript":
            print(chunk["content"], end="", flush=True)
        elif chunk["type"] == "status":
            print(f"\n[{chunk['progress']*100:.0f}%] {chunk['content']}")

        if chunk.get("finished"):
            print()

# Test different modalities
await process_multi_modal("text", "Hello, how are you?")
await process_multi_modal("image", "https://example.com/image.jpg")
await process_multi_modal("audio", b"audio_data_here")

Streaming with Real-time Processing

class RealtimeProcessor(StreamingNode):
    """Process streaming data in real-time"""

    def __init__(self, buffer_size=10):
        super().__init__()
        self.buffer = []
        self.buffer_size = buffer_size

    async def stream_execute(self, data) -> AsyncGenerator:
        """Process stream with buffering"""
        stream = data.get("stream", [])

        for item in stream:
            self.buffer.append(item)

            # Process when buffer is full
            if len(self.buffer) >= self.buffer_size:
                result = await self._process_buffer()
                yield result
                self.buffer = []

        # Process remaining items
        if self.buffer:
            result = await self._process_buffer()
            yield result

    async def _process_buffer(self):
        """Process buffered items"""
        await asyncio.sleep(0.1)  # Simulate processing
        return {
            "processed": len(self.buffer),
            "items": self.buffer.copy(),
            "timestamp": "2024-01-01T12:00:00Z"
        }

class StreamAggregator(ChainableNode):
    """Aggregate streaming results"""

    def __init__(self):
        super().__init__()
        self.results = []

    async def execute(self, data):
        """Aggregate streaming data"""
        if isinstance(data, dict):
            self.results.append(data)

        return {
            "total_processed": sum(r.get("processed", 0) for r in self.results),
            "chunks": len(self.results),
            "all_items": [
                item for r in self.results
                for item in r.get("items", [])
            ]
        }

# Real-time streaming pipeline
realtime_pipeline = (
    RealtimeProcessor(buffer_size=5)
    | StreamAggregator()
)

# Generate streaming data
async def generate_stream(count=100):
    """Generate streaming data"""
    for i in range(count):
        yield {"id": i, "value": f"item_{i}"}
        await asyncio.sleep(0.01)

# Process stream
async def process_stream():
    stream_data = []
    async for item in generate_stream(50):
        stream_data.append(item)

    # Process in batches
    processor = RealtimeProcessor(buffer_size=10)
    aggregator = StreamAggregator()

    async for batch_result in processor.stream_execute({"stream": stream_data}):
        print(f"Processed batch: {batch_result['processed']} items")
        final_result = await aggregator.execute(batch_result)

    print(f"Total processed: {final_result['total_processed']} items")

WebSocket Streaming Chat

class WebSocketChatNode(StreamingNode):
    """WebSocket-based streaming chat"""

    def __init__(self, websocket_url=None):
        super().__init__()
        self.websocket_url = websocket_url
        self.connection = None

    async def connect(self):
        """Establish WebSocket connection"""
        # In production, use actual WebSocket library
        self.connection = {"status": "connected"}
        return True

    async def disconnect(self):
        """Close WebSocket connection"""
        self.connection = None

    async def stream_execute(self, data) -> AsyncGenerator:
        """Stream over WebSocket"""
        if not self.connection:
            await self.connect()

        message = data.get("message", "")

        # Send message (simulated)
        await self._send_message(message)

        # Receive streaming response
        async for chunk in self._receive_stream():
            yield chunk

    async def _send_message(self, message: str):
        """Send message over WebSocket"""
        await asyncio.sleep(0.1)
        print(f"Sent: {message}")

    async def _receive_stream(self) -> AsyncGenerator:
        """Receive streaming response"""
        # Simulate receiving streaming data
        response_chunks = [
            {"type": "start", "session_id": "abc123"},
            {"type": "token", "content": "I "},
            {"type": "token", "content": "understand "},
            {"type": "token", "content": "your "},
            {"type": "token", "content": "question. "},
            {"type": "end", "total_tokens": 4}
        ]

        for chunk in response_chunks:
            await asyncio.sleep(0.05)
            yield chunk

# WebSocket chat workflow
class WebSocketChatWorkflow:
    def __init__(self):
        self.chat_node = WebSocketChatNode("wss://chat.example.com")
        self.session_active = False

    async def start_session(self):
        """Start chat session"""
        await self.chat_node.connect()
        self.session_active = True
        print("Chat session started")

    async def send_message(self, message: str):
        """Send message and stream response"""
        if not self.session_active:
            await self.start_session()

        print(f"\nYou: {message}")
        print("Assistant: ", end="", flush=True)

        async for chunk in self.chat_node.stream_execute({"message": message}):
            if chunk["type"] == "token":
                print(chunk["content"], end="", flush=True)
            elif chunk["type"] == "end":
                print(f"\n(Tokens used: {chunk['total_tokens']})")

    async def end_session(self):
        """End chat session"""
        await self.chat_node.disconnect()
        self.session_active = False
        print("Chat session ended")

# Use WebSocket chat
ws_chat = WebSocketChatWorkflow()
await ws_chat.send_message("What is Flow Core?")
await ws_chat.send_message("Tell me more about streaming")
await ws_chat.end_session()

Performance Optimization

Token Batching

class BatchedStreamingNode(StreamingNode):
    """Batch tokens for efficient streaming"""

    def __init__(self, batch_size=5):
        super().__init__()
        self.batch_size = batch_size

    async def stream_execute(self, data) -> AsyncGenerator:
        """Stream with token batching"""
        response = "This is a long response that will be streamed in batches for better performance."
        tokens = response.split()

        batch = []
        for i, token in enumerate(tokens):
            batch.append(token)

            if len(batch) >= self.batch_size or i == len(tokens) - 1:
                await asyncio.sleep(0.1)
                yield {
                    "tokens": batch,
                    "batch_index": i // self.batch_size,
                    "finished": i == len(tokens) - 1
                }
                batch = []

# Batched streaming
batched_stream = BatchedStreamingNode(batch_size=3)

async for batch_data in batched_stream.stream_execute({"message": "test"}):
    print(" ".join(batch_data["tokens"]), end=" ", flush=True)

Stream Caching

class CachedStreamingNode(StreamingNode):
    """Cache streaming responses"""

    def __init__(self):
        super().__init__()
        self.cache = {}

    async def stream_execute(self, data) -> AsyncGenerator:
        """Stream with caching"""
        cache_key = self._get_cache_key(data)

        # Check cache
        if cache_key in self.cache:
            # Stream from cache
            for chunk in self.cache[cache_key]:
                yield chunk
                await asyncio.sleep(0.01)  # Simulate streaming delay
        else:
            # Generate and cache
            chunks = []
            async for chunk in self._generate_response(data):
                chunks.append(chunk)
                yield chunk

            self.cache[cache_key] = chunks

    def _get_cache_key(self, data):
        """Generate cache key"""
        return hash(json.dumps(data, sort_keys=True))

    async def _generate_response(self, data) -> AsyncGenerator:
        """Generate new response"""
        response = f"Response to: {data.get('message', '')}"
        for char in response:
            await asyncio.sleep(0.02)
            yield {"content": char}

Error Handling in Streaming

class RobustStreamingNode(StreamingNode):
    """Streaming with error handling"""

    async def stream_execute(self, data) -> AsyncGenerator:
        """Stream with error recovery"""
        try:
            async for chunk in self._unsafe_stream(data):
                yield chunk
        except Exception as e:
            # Yield error chunk
            yield {
                "type": "error",
                "error": str(e),
                "recoverable": True
            }

            # Try fallback response
            async for chunk in self._fallback_stream(data):
                yield chunk

    async def _unsafe_stream(self, data) -> AsyncGenerator:
        """Potentially failing stream"""
        for i in range(10):
            if i == 5:
                raise Exception("Stream interrupted")
            await asyncio.sleep(0.1)
            yield {"content": f"chunk_{i}"}

    async def _fallback_stream(self, data) -> AsyncGenerator:
        """Fallback stream"""
        yield {"content": "\n[Recovered] Using fallback response"}

Best Practices

  • Use appropriate buffer sizes
  • Implement token batching for efficiency
  • Cache frequently requested responses
  • Monitor memory usage for long streams
  • Provide immediate feedback
  • Show progress indicators
  • Handle connection interruptions gracefully
  • Implement retry logic for failures
  • Close connections properly
  • Implement timeouts for streaming
  • Limit concurrent streams
  • Clean up resources on errors

Next Steps