Overview
Flow Core’s memory management enables AI agents to maintain conversation history, remember context across interactions, and provide coherent multi-turn conversations.Chat History Types
InMemoryChatHistory
Basic in-memory chat history for conversation storage:Copy
from nadoo_flow import InMemoryChatHistory, Message
# Create history instance
history = InMemoryChatHistory()
# Add messages
await history.add_message(Message.user("Hello, how are you?"))
await history.add_message(Message.assistant("I'm doing well, thank you!"))
# Get conversation history
messages = await history.get_messages()
for msg in messages:
print(f"{msg.role}: {msg.content}")
# user: Hello, how are you?
# assistant: I'm doing well, thank you!
# Clear history
await history.clear()
SlidingWindowChatHistory
Sliding window that keeps only recent N messages (for token management):Copy
from nadoo_flow import InMemoryChatHistory, SlidingWindowChatHistory, Message
# Create windowed history
base_history = InMemoryChatHistory()
window_history = SlidingWindowChatHistory(
base_history=base_history,
window_size=10 # Keep only last 10 messages
)
# Add messages
await window_history.add_message(Message.user("Message 1"))
await window_history.add_message(Message.assistant("Response 1"))
# ... add more messages ...
# Get only recent messages (last 10)
recent_messages = await window_history.get_messages()
RedisChatHistory
Redis-backed persistent chat history for distributed systems:Copy
from nadoo_flow import RedisChatHistory, Message
import redis
# Create Redis history
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
history = RedisChatHistory(
session_id="user_123",
redis_client=redis_client,
key_prefix="chat:",
ttl=3600 # 1 hour TTL
)
# Add messages
await history.add_message(Message.user("Hello"))
await history.add_message(Message.assistant("Hi there!"))
# Get messages (persisted in Redis)
messages = await history.get_messages()
Message Class
All chat histories use theMessage class:
Copy
from nadoo_flow import Message
# Create messages
user_msg = Message.user("What's the weather?")
assistant_msg = Message.assistant("It's sunny today!")
system_msg = Message.system("You are a helpful assistant")
# Access message properties
print(user_msg.role) # "user"
print(user_msg.content) # "What's the weather?"
Session Management
SessionHistoryManager
Manage multiple chat histories for different sessions:Copy
from nadoo_flow import (
SessionHistoryManager,
InMemoryChatHistory,
create_inmemory_history_manager
)
# Create manager with history factory
manager = SessionHistoryManager(
history_factory=lambda session_id: InMemoryChatHistory(),
window_size=10 # Optional: use sliding window
)
# Or use convenience function
manager = create_inmemory_history_manager(window_size=10)
# Get history for a session (creates if doesn't exist)
history = await manager.get_history("user_123")
# Add messages
await history.add_message(Message.user("Hello"))
await history.add_message(Message.assistant("Hi!"))
# Get messages
messages = await history.get_messages()
ChatHistoryNode
Mixin class to add chat history to any node:Copy
from nadoo_flow import BaseNode, ChatHistoryNode, SessionHistoryManager, Message, NodeResult
class ConversationalNode(BaseNode, ChatHistoryNode):
"""Node with built-in chat history"""
def __init__(self, node_id: str, history_manager: SessionHistoryManager):
BaseNode.__init__(
self,
node_id=node_id,
node_type="conversational",
name="Conversational",
config={}
)
ChatHistoryNode.__init__(self, history_manager=history_manager)
async def execute(self, node_context, workflow_context):
# Get history for this session
history = await self.get_history(workflow_context)
# Get user message
user_message = node_context.input_data.get("message")
await history.add_message(Message.user(user_message))
# Get conversation context
messages = await history.get_messages()
# Process with history context
response = f"You said: {user_message} (with {len(messages)} message context)"
# Save assistant response
await history.add_message(Message.assistant(response))
return NodeResult(success=True, output={"response": response})
context = []
if self.summary:
context.append({
"role": "system",
"content": f"Previous conversation summary: {self.summary}"
})
context.extend(self.messages)
return context
SummaryBufferMemory
Combine summary and buffer:Copy
class SummaryBufferMemory:
"""Hybrid summary and buffer memory"""
def __init__(self, summarizer, buffer_size=10, summary_threshold=50):
self.summarizer = summarizer
self.buffer_size = buffer_size
self.summary_threshold = summary_threshold
self.summary = ""
self.buffer = []
self.total_messages = 0
async def add_message(self, role, content):
self.buffer.append({"role": role, "content": content})
self.total_messages += 1
# Maintain buffer size
if len(self.buffer) > self.buffer_size:
overflow = self.buffer[:-self.buffer_size]
self.buffer = self.buffer[-self.buffer_size:]
# Add to summary if threshold reached
if self.total_messages % self.summary_threshold == 0:
await self._update_summary(overflow)
async def _update_summary(self, messages):
"""Update summary with new messages"""
new_summary = await self.summarizer.summarize(messages)
if self.summary:
self.summary = f"{self.summary}\n\n{new_summary}"
else:
self.summary = new_summary
def get_full_context(self):
"""Get complete context with summary and buffer"""
context = []
if self.summary:
context.append({
"role": "system",
"content": f"Conversation history:\n{self.summary}"
})
context.extend(self.buffer)
return context
Entity Memory
EntityMemory
Track and remember entities:Copy
class EntityMemory:
"""Remember information about entities"""
def __init__(self):
self.entities = {}
def update_entity(self, entity_name, attributes):
"""Update entity information"""
if entity_name not in self.entities:
self.entities[entity_name] = {
"created_at": datetime.now(),
"attributes": {},
"mentions": 0
}
entity = self.entities[entity_name]
entity["attributes"].update(attributes)
entity["mentions"] += 1
entity["last_mentioned"] = datetime.now()
def get_entity(self, entity_name):
"""Get entity information"""
return self.entities.get(entity_name, {})
def get_context(self, relevant_entities=None):
"""Get entity context for conversation"""
if relevant_entities:
entities = {
k: v for k, v in self.entities.items()
if k in relevant_entities
}
else:
entities = self.entities
if not entities:
return ""
context = "Known entities:\n"
for name, info in entities.items():
attrs = ", ".join(f"{k}={v}" for k, v in info["attributes"].items())
context += f"- {name}: {attrs}\n"
return context
Vector Memory
VectorStoreMemory
Use vector similarity for relevant memory retrieval:Copy
class VectorStoreMemory:
"""Memory with vector similarity search"""
def __init__(self, embedding_model, vector_store):
self.embedding_model = embedding_model
self.vector_store = vector_store
self.messages = []
async def add_message(self, role, content):
"""Add message with embedding"""
# Generate embedding
embedding = await self.embedding_model.embed(content)
# Store in vector store
message_id = len(self.messages)
await self.vector_store.add(
id=message_id,
embedding=embedding,
metadata={
"role": role,
"content": content,
"timestamp": datetime.now()
}
)
self.messages.append({
"id": message_id,
"role": role,
"content": content
})
async def search_similar(self, query, top_k=5):
"""Search for similar messages"""
# Get query embedding
query_embedding = await self.embedding_model.embed(query)
# Search vector store
results = await self.vector_store.search(
query_embedding,
top_k=top_k
)
return [r.metadata for r in results]
async def get_relevant_context(self, current_message, max_messages=10):
"""Get relevant context based on similarity"""
# Search for relevant past messages
relevant = await self.search_similar(current_message, top_k=max_messages)
# Sort by timestamp
relevant.sort(key=lambda x: x["timestamp"])
return relevant
Memory in Workflows
Memory Node
Copy
from nadoo_flow import BaseNode, NodeResult
class MemoryNode(BaseNode):
"""Node with conversation memory"""
def __init__(self, memory_type="buffer"):
super().__init__(node_id="memory_node")
if memory_type == "buffer":
self.memory = BufferMemory()
elif memory_type == "summary":
self.memory = ConversationSummaryMemory()
elif memory_type == "vector":
self.memory = VectorStoreMemory()
async def execute(self, node_context, workflow_context):
# Get user input
user_message = node_context.input_data.get("message")
# Add to memory
self.memory.add_message("user", user_message)
# Get conversation context
context = self.memory.get_context()
# Generate response with context
response = await self.generate_response(context, user_message)
# Add response to memory
self.memory.add_message("assistant", response)
return NodeResult(
success=True,
output={
"response": response,
"conversation_history": context
}
)
Stateful Conversation Flow
Copy
class ConversationFlow:
"""Workflow with persistent memory"""
def __init__(self):
self.memory = ConversationMemory()
self.entity_memory = EntityMemory()
self.workflow = (
InputNode()
| EntityExtractionNode()
| MemoryUpdateNode(self.memory, self.entity_memory)
| ResponseGenerationNode()
| OutputNode()
)
async def process_message(self, user_message):
# Update memory
self.memory.add_message("user", user_message)
# Run workflow with memory context
result = await self.workflow.run({
"message": user_message,
"memory": self.memory.get_history(),
"entities": self.entity_memory.get_context()
})
# Store response
self.memory.add_message("assistant", result["response"])
return result
Memory Persistence
Save and Load Memory
Copy
class PersistentMemory:
"""Memory with persistence"""
def __init__(self, storage_backend):
self.storage = storage_backend
self.memory = ConversationMemory()
self.session_id = None
async def load_session(self, session_id):
"""Load memory from storage"""
self.session_id = session_id
data = await self.storage.get(f"memory:{session_id}")
if data:
self.memory = ConversationMemory.from_dict(data)
async def save_session(self):
"""Save memory to storage"""
if self.session_id:
await self.storage.set(
f"memory:{self.session_id}",
self.memory.to_dict()
)
async def add_message(self, role, content):
"""Add message and auto-save"""
self.memory.add_message(role, content)
await self.save_session()
# Usage
memory = PersistentMemory(redis_backend)
await memory.load_session("user_123")
await memory.add_message("user", "Hello")
Database-Backed Memory
Copy
class DatabaseMemory:
"""Memory stored in database"""
def __init__(self, db_connection):
self.db = db_connection
async def add_message(self, session_id, role, content):
"""Add message to database"""
await self.db.execute(
"""
INSERT INTO conversation_history
(session_id, role, content, timestamp)
VALUES ($1, $2, $3, $4)
""",
session_id, role, content, datetime.now()
)
async def get_history(self, session_id, limit=50):
"""Get conversation history from database"""
rows = await self.db.fetch(
"""
SELECT role, content, timestamp
FROM conversation_history
WHERE session_id = $1
ORDER BY timestamp DESC
LIMIT $2
""",
session_id, limit
)
# Return in chronological order
return list(reversed([{
"role": row["role"],
"content": row["content"],
"timestamp": row["timestamp"]
} for row in rows]))
async def clear_session(self, session_id):
"""Clear session history"""
await self.db.execute(
"DELETE FROM conversation_history WHERE session_id = $1",
session_id
)
Memory Optimization
Compression
Copy
class CompressedMemory:
"""Memory with compression"""
def __init__(self, compressor):
self.compressor = compressor
self.compressed_history = []
self.active_buffer = []
def add_message(self, role, content):
"""Add to active buffer"""
self.active_buffer.append({"role": role, "content": content})
# Compress when buffer is full
if len(self.active_buffer) >= 10:
self._compress_buffer()
def _compress_buffer(self):
"""Compress and store buffer"""
compressed = self.compressor.compress(self.active_buffer)
self.compressed_history.append(compressed)
self.active_buffer.clear()
def get_full_history(self):
"""Decompress and return full history"""
history = []
# Decompress historical data
for compressed in self.compressed_history:
history.extend(self.compressor.decompress(compressed))
# Add active buffer
history.extend(self.active_buffer)
return history
Selective Memory
Copy
class SelectiveMemory:
"""Remember only important information"""
def __init__(self, importance_scorer):
self.scorer = importance_scorer
self.memories = []
self.importance_threshold = 0.7
async def add_message(self, role, content, context=None):
"""Add message if important enough"""
# Score importance
score = await self.scorer.score(content, context)
if score >= self.importance_threshold:
self.memories.append({
"role": role,
"content": content,
"importance": score,
"timestamp": datetime.now()
})
# Keep only top N important memories
self.memories.sort(key=lambda x: x["importance"], reverse=True)
self.memories = self.memories[:100]
def get_important_context(self, top_k=10):
"""Get most important memories"""
return self.memories[:top_k]
Best Practices
Choose Right Memory Type
Choose Right Memory Type
- Use BufferMemory for short conversations
- Use SummaryMemory for long conversations
- Use VectorMemory for knowledge-intensive tasks
- Use EntityMemory for tracking specific information
Manage Memory Size
Manage Memory Size
Set appropriate limits to prevent memory overflow:
Copy
memory = ConversationMemory(max_messages=100)
Persist Important Conversations
Persist Important Conversations
Save conversation history for important sessions:
Copy
await memory.save_to_file(f"session_{session_id}.json")
Clear Sensitive Data
Clear Sensitive Data
Always clear memory containing sensitive information after use.
Use Compression
Use Compression
For long-running applications, compress old memories to save space.