Overview
Flow Core provides a comprehensive memory management system for maintaining conversation history, session state, and context across workflow executions. Built with flexibility in mind, it supports various storage backends from in-memory to distributed systems.Chat History
BaseChatHistory
Abstract base class for all chat history implementations:Copy
from nadoo_flow import BaseChatHistory
class CustomHistory(BaseChatHistory):
async def get_messages(self) -> list[Message]:
"""Retrieve all messages in history"""
pass
async def add_message(self, message: Message) -> None:
"""Add a single message"""
pass
async def add_messages(self, messages: list[Message]) -> None:
"""Add multiple messages"""
pass
async def clear(self) -> None:
"""Clear all messages"""
pass
InMemoryChatHistory
Simple in-process memory storage:Copy
from nadoo_flow import InMemoryChatHistory, Message
# Create history
history = InMemoryChatHistory()
# Add messages
await history.add_message(Message.user("Hello, AI!"))
await history.add_message(Message.assistant("Hello! How can I help?"))
# Retrieve messages
messages = await history.get_messages()
for msg in messages:
print(f"{msg.role}: {msg.content}")
# Clear history
await history.clear()
SlidingWindowChatHistory
Maintain only recent N messages for memory efficiency:Copy
from nadoo_flow import SlidingWindowChatHistory, InMemoryChatHistory
# Keep only last 10 messages
history = SlidingWindowChatHistory(
base_history=InMemoryChatHistory(),
window_size=10
)
# Add many messages
for i in range(20):
await history.add_message(Message.user(f"Message {i}"))
# Only last 10 are retained
messages = await history.get_messages()
print(len(messages)) # 10
RedisChatHistory
Distributed Redis-backed storage for scalability:Copy
from nadoo_flow import RedisChatHistory
import redis.asyncio as redis
# Setup Redis client
redis_client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True
)
# Create Redis-backed history
history = RedisChatHistory(
session_id="user_123_chat_456",
redis_client=redis_client,
key_prefix="chat_history:",
ttl=86400 # 24 hours expiration
)
# Use like any other history
await history.add_message(Message.user("Hello"))
messages = await history.get_messages()
# Session persists across restarts
# Key in Redis: "chat_history:user_123_chat_456"
Session Management
SessionHistoryManager
Manage multiple conversation sessions:Copy
from nadoo_flow import SessionHistoryManager, InMemoryChatHistory
# Create manager with factory function
manager = SessionHistoryManager(
history_factory=lambda session_id: InMemoryChatHistory(),
window_size=10 # Optional sliding window
)
# Get or create session history
session1 = await manager.get_history("user_123")
await session1.add_message(Message.user("Hi from session 1"))
session2 = await manager.get_history("user_456")
await session2.add_message(Message.user("Hi from session 2"))
# Clear specific session
await manager.clear_history("user_123")
# List all active sessions
sessions = manager.list_sessions()
print(f"Active sessions: {sessions}")
Advanced Session Patterns
Multi-Tenant Sessions
Copy
class TenantSessionManager:
def __init__(self, redis_client):
self.redis = redis_client
def get_manager(self, tenant_id: str):
"""Get session manager for tenant"""
return SessionHistoryManager(
history_factory=lambda sid: RedisChatHistory(
session_id=f"{tenant_id}:{sid}",
redis_client=self.redis,
key_prefix=f"tenant:{tenant_id}:chat:",
ttl=3600 # 1 hour
)
)
# Usage
tenant_manager = TenantSessionManager(redis_client)
acme_sessions = tenant_manager.get_manager("acme_corp")
history = await acme_sessions.get_history("chat_789")
Conversation Branching
Copy
class BranchableHistory:
"""Support conversation branching/forking"""
def __init__(self):
self.branches = {}
self.current_branch = "main"
async def create_branch(self, name: str, from_branch: str = None):
"""Create a new conversation branch"""
source = from_branch or self.current_branch
self.branches[name] = self.branches[source].copy()
async def switch_branch(self, name: str):
"""Switch to different branch"""
self.current_branch = name
async def merge_branches(self, source: str, target: str):
"""Merge conversation branches"""
# Merge logic here
pass
Message Types
Message Class
Core message data structure:Copy
from nadoo_flow import Message
from typing import Literal
@dataclass
class Message:
role: Literal["system", "user", "assistant", "function", "tool"]
content: str | list[dict[str, Any]]
name: str | None = None
metadata: dict[str, Any] | None = None
# Factory methods
@classmethod
def system(cls, content: str, **kwargs):
return cls(role="system", content=content, **kwargs)
@classmethod
def user(cls, content: str, **kwargs):
return cls(role="user", content=content, **kwargs)
@classmethod
def assistant(cls, content: str, **kwargs):
return cls(role="assistant", content=content, **kwargs)
# Conversion
def to_dict(self) -> dict:
"""Convert to OpenAI format"""
return {
"role": self.role,
"content": self.content,
"name": self.name
}
Multi-Modal Messages
Support for images and complex content:Copy
# Text message
text_msg = Message.user("What's in this image?")
# Multi-modal message
image_msg = Message.user([
{"type": "text", "text": "What's in this image?"},
{"type": "image_url", "image_url": {
"url": "https://example.com/image.png"
}}
])
# Tool/Function messages
tool_msg = Message(
role="tool",
content='{"result": "success"}',
name="search_tool"
)
ChatHistoryNode Integration
Adding Memory to Nodes
Integrate chat history into workflow nodes:Copy
from nadoo_flow import BaseNode, ChatHistoryNode, NodeResult
class ChatbotNode(BaseNode, ChatHistoryNode):
def __init__(self, history_manager):
BaseNode.__init__(self, "chatbot")
ChatHistoryNode.__init__(
self,
history_manager=history_manager,
session_key="session_id" # Context field for session ID
)
async def execute(self, node_context, workflow_context):
# Get session ID from context
session_id = workflow_context.get_global_variable("session_id")
# Get history for this session
history = await self.get_history(workflow_context)
# Get all messages
messages = await history.get_messages()
# Get user input
user_input = node_context.input_data["message"]
user_msg = Message.user(user_input)
# Generate response based on history
response = await self.generate_response(messages + [user_msg])
assistant_msg = Message.assistant(response)
# Save interaction
await self.save_interaction(
workflow_context,
user_msg,
assistant_msg
)
return NodeResult(
success=True,
output={"response": response}
)
Usage in Workflows
Copy
# Setup history manager
history_manager = SessionHistoryManager(
history_factory=lambda sid: InMemoryChatHistory(),
window_size=20
)
# Create chatbot node
chatbot = ChatbotNode(history_manager)
# Create workflow
workflow = InputNode() | chatbot | OutputNode()
# Execute with session
workflow_context = WorkflowContext(
workflow_id="chat_workflow",
global_variables={"session_id": "user_123"}
)
result = await workflow.run(
{"message": "Hello!"},
workflow_context
)
Memory Patterns
Pattern 1: Conversation Summarization
Periodically summarize old messages to save memory:Copy
class SummarizingHistory(BaseChatHistory):
def __init__(self, base_history, summarizer_node, max_messages=50):
self.base = base_history
self.summarizer = summarizer_node
self.max_messages = max_messages
self.summary = None
async def add_message(self, message: Message):
messages = await self.base.get_messages()
if len(messages) >= self.max_messages:
# Summarize old messages
old_messages = messages[:25]
self.summary = await self.summarizer.summarize(old_messages)
# Keep recent messages + summary
await self.base.clear()
await self.base.add_message(
Message.system(f"Previous conversation summary: {self.summary}")
)
for msg in messages[25:]:
await self.base.add_message(msg)
await self.base.add_message(message)
Pattern 2: Context-Aware Memory
Filter messages based on relevance:Copy
class ContextAwareHistory:
def __init__(self, base_history, embedder):
self.base = base_history
self.embedder = embedder
async def get_relevant_messages(
self,
query: str,
max_messages: int = 10
) -> list[Message]:
"""Get messages relevant to query"""
all_messages = await self.base.get_messages()
if len(all_messages) <= max_messages:
return all_messages
# Embed query and messages
query_embedding = await self.embedder.embed(query)
message_embeddings = [
await self.embedder.embed(msg.content)
for msg in all_messages
]
# Calculate similarity scores
scores = [
cosine_similarity(query_embedding, msg_emb)
for msg_emb in message_embeddings
]
# Return top-k relevant messages
ranked = sorted(
zip(scores, all_messages),
key=lambda x: x[0],
reverse=True
)
return [msg for _, msg in ranked[:max_messages]]
Pattern 3: Multi-Modal Memory
Store and retrieve different content types:Copy
class MultiModalHistory:
def __init__(self):
self.text_history = InMemoryChatHistory()
self.image_cache = {}
self.audio_cache = {}
async def add_multi_modal_message(
self,
text: str = None,
image_url: str = None,
audio_data: bytes = None
):
content = []
if text:
content.append({"type": "text", "text": text})
if image_url:
image_id = hashlib.md5(image_url.encode()).hexdigest()
self.image_cache[image_id] = image_url
content.append({
"type": "image_ref",
"image_id": image_id
})
if audio_data:
audio_id = hashlib.md5(audio_data).hexdigest()
self.audio_cache[audio_id] = audio_data
content.append({
"type": "audio_ref",
"audio_id": audio_id
})
await self.text_history.add_message(
Message.user(content)
)
Performance Optimization
Lazy Loading
Load messages only when needed:Copy
class LazyHistory:
def __init__(self, storage_backend):
self.storage = storage_backend
self._messages = None
self._loaded = False
async def get_messages(self) -> list[Message]:
if not self._loaded:
self._messages = await self.storage.load()
self._loaded = True
return self._messages
async def add_message(self, message: Message):
# Ensure messages are loaded
await self.get_messages()
self._messages.append(message)
# Batch write operations
await self.storage.append(message)
Caching Strategies
Copy
from functools import lru_cache
class CachedHistory:
def __init__(self, base_history):
self.base = base_history
@lru_cache(maxsize=100)
async def get_cached_summary(self, session_id: str):
"""Cache conversation summaries"""
messages = await self.base.get_messages()
return self.summarize(messages)
async def invalidate_cache(self, session_id: str):
"""Invalidate cache on updates"""
self.get_cached_summary.cache_clear()
Best Practices
Choose Appropriate Storage
Choose Appropriate Storage
- InMemory: Development, testing, single-instance
- Redis: Production, distributed, multi-instance
- Database: Compliance, long-term storage
Implement Retention Policies
Implement Retention Policies
Copy
# Sliding window for active conversations
SlidingWindowChatHistory(window_size=50)
# TTL for Redis storage
RedisChatHistory(ttl=3600) # 1 hour
# Periodic cleanup for database
await cleanup_old_sessions(days=30)
Handle Session Lifecycle
Handle Session Lifecycle
Copy
try:
history = await manager.get_history(session_id)
# Use history
finally:
# Clean up if needed
if session_ended:
await manager.clear_history(session_id)
Monitor Memory Usage
Monitor Memory Usage
Copy
# Track session count
active_sessions = len(manager.list_sessions())
# Monitor message count
for session_id in manager.list_sessions():
history = await manager.get_history(session_id)
messages = await history.get_messages()
if len(messages) > threshold:
# Summarize or archive
pass
Complete Example
Copy
from nadoo_flow import (
BaseNode, ChatHistoryNode, NodeResult,
SessionHistoryManager, RedisChatHistory,
Message, WorkflowContext
)
import redis.asyncio as redis
class SmartChatbot(BaseNode, ChatHistoryNode):
"""Chatbot with persistent memory"""
def __init__(self, redis_client):
BaseNode.__init__(self, "smart_chatbot")
# Setup Redis-backed session manager
manager = SessionHistoryManager(
history_factory=lambda sid: RedisChatHistory(
session_id=sid,
redis_client=redis_client,
ttl=3600 # 1 hour sessions
),
window_size=50 # Keep last 50 messages
)
ChatHistoryNode.__init__(
self,
history_manager=manager,
session_key="user_id"
)
async def execute(self, node_context, workflow_context):
# Get user message
user_input = node_context.input_data["message"]
user_id = workflow_context.get_global_variable("user_id")
# Get conversation history
history = await self.get_history(workflow_context)
messages = await history.get_messages()
# Add system prompt if first message
if not messages:
system_msg = Message.system(
"You are a helpful assistant with memory of past conversations."
)
await history.add_message(system_msg)
messages = [system_msg]
# Create user message
user_msg = Message.user(user_input)
# Generate response with context
response = await self.generate_with_context(
messages + [user_msg]
)
# Save interaction
assistant_msg = Message.assistant(response)
await self.save_interaction(
workflow_context,
user_msg,
assistant_msg
)
return NodeResult(
success=True,
output={
"response": response,
"session_id": user_id,
"message_count": len(messages) + 2
}
)
# Usage
async def main():
# Setup Redis
redis_client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True
)
# Create chatbot
chatbot = SmartChatbot(redis_client)
# Simulate conversation
context = WorkflowContext(
workflow_id="chat",
global_variables={"user_id": "user_123"}
)
# First message
result = await chatbot.execute(
NodeContext(
node_id="chat",
input_data={"message": "Hi, I'm John"}
),
context
)
# Follow-up (remembers previous)
result = await chatbot.execute(
NodeContext(
node_id="chat",
input_data={"message": "What's my name?"}
),
context
)
# Response will know the name is John
# Run
asyncio.run(main())