Skip to main content

Overview

Flow Core’s memory management enables AI agents to maintain conversation history, remember context across interactions, and provide coherent multi-turn conversations.

Chat History Types

InMemoryChatHistory

Basic in-memory chat history for conversation storage:
from nadoo_flow import InMemoryChatHistory, Message

# Create history instance
history = InMemoryChatHistory()

# Add messages
await history.add_message(Message.user("Hello, how are you?"))
await history.add_message(Message.assistant("I'm doing well, thank you!"))

# Get conversation history
messages = await history.get_messages()
for msg in messages:
    print(f"{msg.role}: {msg.content}")
# user: Hello, how are you?
# assistant: I'm doing well, thank you!

# Clear history
await history.clear()

SlidingWindowChatHistory

Sliding window that keeps only recent N messages (for token management):
from nadoo_flow import InMemoryChatHistory, SlidingWindowChatHistory, Message

# Create windowed history
base_history = InMemoryChatHistory()
window_history = SlidingWindowChatHistory(
    base_history=base_history,
    window_size=10  # Keep only last 10 messages
)

# Add messages
await window_history.add_message(Message.user("Message 1"))
await window_history.add_message(Message.assistant("Response 1"))
# ... add more messages ...

# Get only recent messages (last 10)
recent_messages = await window_history.get_messages()

RedisChatHistory

Redis-backed persistent chat history for distributed systems:
from nadoo_flow import RedisChatHistory, Message
import redis

# Create Redis history
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
history = RedisChatHistory(
    session_id="user_123",
    redis_client=redis_client,
    key_prefix="chat:",
    ttl=3600  # 1 hour TTL
)

# Add messages
await history.add_message(Message.user("Hello"))
await history.add_message(Message.assistant("Hi there!"))

# Get messages (persisted in Redis)
messages = await history.get_messages()

Message Class

All chat histories use the Message class:
from nadoo_flow import Message

# Create messages
user_msg = Message.user("What's the weather?")
assistant_msg = Message.assistant("It's sunny today!")
system_msg = Message.system("You are a helpful assistant")

# Access message properties
print(user_msg.role)      # "user"
print(user_msg.content)   # "What's the weather?"

Session Management

SessionHistoryManager

Manage multiple chat histories for different sessions:
from nadoo_flow import (
    SessionHistoryManager,
    InMemoryChatHistory,
    create_inmemory_history_manager
)

# Create manager with history factory
manager = SessionHistoryManager(
    history_factory=lambda session_id: InMemoryChatHistory(),
    window_size=10  # Optional: use sliding window
)

# Or use convenience function
manager = create_inmemory_history_manager(window_size=10)

# Get history for a session (creates if doesn't exist)
history = await manager.get_history("user_123")

# Add messages
await history.add_message(Message.user("Hello"))
await history.add_message(Message.assistant("Hi!"))

# Get messages
messages = await history.get_messages()

ChatHistoryNode

Mixin class to add chat history to any node:
from nadoo_flow import BaseNode, ChatHistoryNode, SessionHistoryManager, Message, NodeResult

class ConversationalNode(BaseNode, ChatHistoryNode):
    """Node with built-in chat history"""

    def __init__(self, node_id: str, history_manager: SessionHistoryManager):
        BaseNode.__init__(
            self,
            node_id=node_id,
            node_type="conversational",
            name="Conversational",
            config={}
        )
        ChatHistoryNode.__init__(self, history_manager=history_manager)

    async def execute(self, node_context, workflow_context):
        # Get history for this session
        history = await self.get_history(workflow_context)

        # Get user message
        user_message = node_context.input_data.get("message")
        await history.add_message(Message.user(user_message))

        # Get conversation context
        messages = await history.get_messages()

        # Process with history context
        response = f"You said: {user_message} (with {len(messages)} message context)"

        # Save assistant response
        await history.add_message(Message.assistant(response))

        return NodeResult(success=True, output={"response": response})
        context = []

        if self.summary:
            context.append({
                "role": "system",
                "content": f"Previous conversation summary: {self.summary}"
            })

        context.extend(self.messages)
        return context

SummaryBufferMemory

Combine summary and buffer:
class SummaryBufferMemory:
    """Hybrid summary and buffer memory"""

    def __init__(self, summarizer, buffer_size=10, summary_threshold=50):
        self.summarizer = summarizer
        self.buffer_size = buffer_size
        self.summary_threshold = summary_threshold
        self.summary = ""
        self.buffer = []
        self.total_messages = 0

    async def add_message(self, role, content):
        self.buffer.append({"role": role, "content": content})
        self.total_messages += 1

        # Maintain buffer size
        if len(self.buffer) > self.buffer_size:
            overflow = self.buffer[:-self.buffer_size]
            self.buffer = self.buffer[-self.buffer_size:]

            # Add to summary if threshold reached
            if self.total_messages % self.summary_threshold == 0:
                await self._update_summary(overflow)

    async def _update_summary(self, messages):
        """Update summary with new messages"""
        new_summary = await self.summarizer.summarize(messages)
        if self.summary:
            self.summary = f"{self.summary}\n\n{new_summary}"
        else:
            self.summary = new_summary

    def get_full_context(self):
        """Get complete context with summary and buffer"""
        context = []

        if self.summary:
            context.append({
                "role": "system",
                "content": f"Conversation history:\n{self.summary}"
            })

        context.extend(self.buffer)
        return context

Entity Memory

EntityMemory

Track and remember entities:
class EntityMemory:
    """Remember information about entities"""

    def __init__(self):
        self.entities = {}

    def update_entity(self, entity_name, attributes):
        """Update entity information"""
        if entity_name not in self.entities:
            self.entities[entity_name] = {
                "created_at": datetime.now(),
                "attributes": {},
                "mentions": 0
            }

        entity = self.entities[entity_name]
        entity["attributes"].update(attributes)
        entity["mentions"] += 1
        entity["last_mentioned"] = datetime.now()

    def get_entity(self, entity_name):
        """Get entity information"""
        return self.entities.get(entity_name, {})

    def get_context(self, relevant_entities=None):
        """Get entity context for conversation"""
        if relevant_entities:
            entities = {
                k: v for k, v in self.entities.items()
                if k in relevant_entities
            }
        else:
            entities = self.entities

        if not entities:
            return ""

        context = "Known entities:\n"
        for name, info in entities.items():
            attrs = ", ".join(f"{k}={v}" for k, v in info["attributes"].items())
            context += f"- {name}: {attrs}\n"

        return context

Vector Memory

VectorStoreMemory

Use vector similarity for relevant memory retrieval:
class VectorStoreMemory:
    """Memory with vector similarity search"""

    def __init__(self, embedding_model, vector_store):
        self.embedding_model = embedding_model
        self.vector_store = vector_store
        self.messages = []

    async def add_message(self, role, content):
        """Add message with embedding"""
        # Generate embedding
        embedding = await self.embedding_model.embed(content)

        # Store in vector store
        message_id = len(self.messages)
        await self.vector_store.add(
            id=message_id,
            embedding=embedding,
            metadata={
                "role": role,
                "content": content,
                "timestamp": datetime.now()
            }
        )

        self.messages.append({
            "id": message_id,
            "role": role,
            "content": content
        })

    async def search_similar(self, query, top_k=5):
        """Search for similar messages"""
        # Get query embedding
        query_embedding = await self.embedding_model.embed(query)

        # Search vector store
        results = await self.vector_store.search(
            query_embedding,
            top_k=top_k
        )

        return [r.metadata for r in results]

    async def get_relevant_context(self, current_message, max_messages=10):
        """Get relevant context based on similarity"""
        # Search for relevant past messages
        relevant = await self.search_similar(current_message, top_k=max_messages)

        # Sort by timestamp
        relevant.sort(key=lambda x: x["timestamp"])

        return relevant

Memory in Workflows

Memory Node

from nadoo_flow import BaseNode, NodeResult

class MemoryNode(BaseNode):
    """Node with conversation memory"""

    def __init__(self, memory_type="buffer"):
        super().__init__(node_id="memory_node")

        if memory_type == "buffer":
            self.memory = BufferMemory()
        elif memory_type == "summary":
            self.memory = ConversationSummaryMemory()
        elif memory_type == "vector":
            self.memory = VectorStoreMemory()

    async def execute(self, node_context, workflow_context):
        # Get user input
        user_message = node_context.input_data.get("message")

        # Add to memory
        self.memory.add_message("user", user_message)

        # Get conversation context
        context = self.memory.get_context()

        # Generate response with context
        response = await self.generate_response(context, user_message)

        # Add response to memory
        self.memory.add_message("assistant", response)

        return NodeResult(
            success=True,
            output={
                "response": response,
                "conversation_history": context
            }
        )

Stateful Conversation Flow

class ConversationFlow:
    """Workflow with persistent memory"""

    def __init__(self):
        self.memory = ConversationMemory()
        self.entity_memory = EntityMemory()

        self.workflow = (
            InputNode()
            | EntityExtractionNode()
            | MemoryUpdateNode(self.memory, self.entity_memory)
            | ResponseGenerationNode()
            | OutputNode()
        )

    async def process_message(self, user_message):
        # Update memory
        self.memory.add_message("user", user_message)

        # Run workflow with memory context
        result = await self.workflow.run({
            "message": user_message,
            "memory": self.memory.get_history(),
            "entities": self.entity_memory.get_context()
        })

        # Store response
        self.memory.add_message("assistant", result["response"])

        return result

Memory Persistence

Save and Load Memory

class PersistentMemory:
    """Memory with persistence"""

    def __init__(self, storage_backend):
        self.storage = storage_backend
        self.memory = ConversationMemory()
        self.session_id = None

    async def load_session(self, session_id):
        """Load memory from storage"""
        self.session_id = session_id
        data = await self.storage.get(f"memory:{session_id}")

        if data:
            self.memory = ConversationMemory.from_dict(data)

    async def save_session(self):
        """Save memory to storage"""
        if self.session_id:
            await self.storage.set(
                f"memory:{self.session_id}",
                self.memory.to_dict()
            )

    async def add_message(self, role, content):
        """Add message and auto-save"""
        self.memory.add_message(role, content)
        await self.save_session()

# Usage
memory = PersistentMemory(redis_backend)
await memory.load_session("user_123")
await memory.add_message("user", "Hello")

Database-Backed Memory

class DatabaseMemory:
    """Memory stored in database"""

    def __init__(self, db_connection):
        self.db = db_connection

    async def add_message(self, session_id, role, content):
        """Add message to database"""
        await self.db.execute(
            """
            INSERT INTO conversation_history
            (session_id, role, content, timestamp)
            VALUES ($1, $2, $3, $4)
            """,
            session_id, role, content, datetime.now()
        )

    async def get_history(self, session_id, limit=50):
        """Get conversation history from database"""
        rows = await self.db.fetch(
            """
            SELECT role, content, timestamp
            FROM conversation_history
            WHERE session_id = $1
            ORDER BY timestamp DESC
            LIMIT $2
            """,
            session_id, limit
        )

        # Return in chronological order
        return list(reversed([{
            "role": row["role"],
            "content": row["content"],
            "timestamp": row["timestamp"]
        } for row in rows]))

    async def clear_session(self, session_id):
        """Clear session history"""
        await self.db.execute(
            "DELETE FROM conversation_history WHERE session_id = $1",
            session_id
        )

Memory Optimization

Compression

class CompressedMemory:
    """Memory with compression"""

    def __init__(self, compressor):
        self.compressor = compressor
        self.compressed_history = []
        self.active_buffer = []

    def add_message(self, role, content):
        """Add to active buffer"""
        self.active_buffer.append({"role": role, "content": content})

        # Compress when buffer is full
        if len(self.active_buffer) >= 10:
            self._compress_buffer()

    def _compress_buffer(self):
        """Compress and store buffer"""
        compressed = self.compressor.compress(self.active_buffer)
        self.compressed_history.append(compressed)
        self.active_buffer.clear()

    def get_full_history(self):
        """Decompress and return full history"""
        history = []

        # Decompress historical data
        for compressed in self.compressed_history:
            history.extend(self.compressor.decompress(compressed))

        # Add active buffer
        history.extend(self.active_buffer)

        return history

Selective Memory

class SelectiveMemory:
    """Remember only important information"""

    def __init__(self, importance_scorer):
        self.scorer = importance_scorer
        self.memories = []
        self.importance_threshold = 0.7

    async def add_message(self, role, content, context=None):
        """Add message if important enough"""
        # Score importance
        score = await self.scorer.score(content, context)

        if score >= self.importance_threshold:
            self.memories.append({
                "role": role,
                "content": content,
                "importance": score,
                "timestamp": datetime.now()
            })

            # Keep only top N important memories
            self.memories.sort(key=lambda x: x["importance"], reverse=True)
            self.memories = self.memories[:100]

    def get_important_context(self, top_k=10):
        """Get most important memories"""
        return self.memories[:top_k]

Best Practices

  • Use BufferMemory for short conversations
  • Use SummaryMemory for long conversations
  • Use VectorMemory for knowledge-intensive tasks
  • Use EntityMemory for tracking specific information
Set appropriate limits to prevent memory overflow:
memory = ConversationMemory(max_messages=100)
Save conversation history for important sessions:
await memory.save_to_file(f"session_{session_id}.json")
Always clear memory containing sensitive information after use.
For long-running applications, compress old memories to save space.

Next Steps