Skip to main content

Overview

Flow Core provides a comprehensive memory management system for maintaining conversation history, session state, and context across workflow executions. Built with flexibility in mind, it supports various storage backends from in-memory to distributed systems.

Chat History

BaseChatHistory

Abstract base class for all chat history implementations:
from nadoo_flow import BaseChatHistory

class CustomHistory(BaseChatHistory):
    async def get_messages(self) -> list[Message]:
        """Retrieve all messages in history"""
        pass

    async def add_message(self, message: Message) -> None:
        """Add a single message"""
        pass

    async def add_messages(self, messages: list[Message]) -> None:
        """Add multiple messages"""
        pass

    async def clear(self) -> None:
        """Clear all messages"""
        pass

InMemoryChatHistory

Simple in-process memory storage:
from nadoo_flow import InMemoryChatHistory, Message

# Create history
history = InMemoryChatHistory()

# Add messages
await history.add_message(Message.user("Hello, AI!"))
await history.add_message(Message.assistant("Hello! How can I help?"))

# Retrieve messages
messages = await history.get_messages()
for msg in messages:
    print(f"{msg.role}: {msg.content}")

# Clear history
await history.clear()

SlidingWindowChatHistory

Maintain only recent N messages for memory efficiency:
from nadoo_flow import SlidingWindowChatHistory, InMemoryChatHistory

# Keep only last 10 messages
history = SlidingWindowChatHistory(
    base_history=InMemoryChatHistory(),
    window_size=10
)

# Add many messages
for i in range(20):
    await history.add_message(Message.user(f"Message {i}"))

# Only last 10 are retained
messages = await history.get_messages()
print(len(messages))  # 10

RedisChatHistory

Distributed Redis-backed storage for scalability:
from nadoo_flow import RedisChatHistory
import redis.asyncio as redis

# Setup Redis client
redis_client = redis.Redis(
    host='localhost',
    port=6379,
    decode_responses=True
)

# Create Redis-backed history
history = RedisChatHistory(
    session_id="user_123_chat_456",
    redis_client=redis_client,
    key_prefix="chat_history:",
    ttl=86400  # 24 hours expiration
)

# Use like any other history
await history.add_message(Message.user("Hello"))
messages = await history.get_messages()

# Session persists across restarts
# Key in Redis: "chat_history:user_123_chat_456"

Session Management

SessionHistoryManager

Manage multiple conversation sessions:
from nadoo_flow import SessionHistoryManager, InMemoryChatHistory

# Create manager with factory function
manager = SessionHistoryManager(
    history_factory=lambda session_id: InMemoryChatHistory(),
    window_size=10  # Optional sliding window
)

# Get or create session history
session1 = await manager.get_history("user_123")
await session1.add_message(Message.user("Hi from session 1"))

session2 = await manager.get_history("user_456")
await session2.add_message(Message.user("Hi from session 2"))

# Clear specific session
await manager.clear_history("user_123")

# List all active sessions
sessions = manager.list_sessions()
print(f"Active sessions: {sessions}")

Advanced Session Patterns

Multi-Tenant Sessions

class TenantSessionManager:
    def __init__(self, redis_client):
        self.redis = redis_client

    def get_manager(self, tenant_id: str):
        """Get session manager for tenant"""
        return SessionHistoryManager(
            history_factory=lambda sid: RedisChatHistory(
                session_id=f"{tenant_id}:{sid}",
                redis_client=self.redis,
                key_prefix=f"tenant:{tenant_id}:chat:",
                ttl=3600  # 1 hour
            )
        )

# Usage
tenant_manager = TenantSessionManager(redis_client)
acme_sessions = tenant_manager.get_manager("acme_corp")
history = await acme_sessions.get_history("chat_789")

Conversation Branching

class BranchableHistory:
    """Support conversation branching/forking"""

    def __init__(self):
        self.branches = {}
        self.current_branch = "main"

    async def create_branch(self, name: str, from_branch: str = None):
        """Create a new conversation branch"""
        source = from_branch or self.current_branch
        self.branches[name] = self.branches[source].copy()

    async def switch_branch(self, name: str):
        """Switch to different branch"""
        self.current_branch = name

    async def merge_branches(self, source: str, target: str):
        """Merge conversation branches"""
        # Merge logic here
        pass

Message Types

Message Class

Core message data structure:
from nadoo_flow import Message
from typing import Literal

@dataclass
class Message:
    role: Literal["system", "user", "assistant", "function", "tool"]
    content: str | list[dict[str, Any]]
    name: str | None = None
    metadata: dict[str, Any] | None = None

    # Factory methods
    @classmethod
    def system(cls, content: str, **kwargs):
        return cls(role="system", content=content, **kwargs)

    @classmethod
    def user(cls, content: str, **kwargs):
        return cls(role="user", content=content, **kwargs)

    @classmethod
    def assistant(cls, content: str, **kwargs):
        return cls(role="assistant", content=content, **kwargs)

    # Conversion
    def to_dict(self) -> dict:
        """Convert to OpenAI format"""
        return {
            "role": self.role,
            "content": self.content,
            "name": self.name
        }

Multi-Modal Messages

Support for images and complex content:
# Text message
text_msg = Message.user("What's in this image?")

# Multi-modal message
image_msg = Message.user([
    {"type": "text", "text": "What's in this image?"},
    {"type": "image_url", "image_url": {
        "url": "https://example.com/image.png"
    }}
])

# Tool/Function messages
tool_msg = Message(
    role="tool",
    content='{"result": "success"}',
    name="search_tool"
)

ChatHistoryNode Integration

Adding Memory to Nodes

Integrate chat history into workflow nodes:
from nadoo_flow import BaseNode, ChatHistoryNode, NodeResult

class ChatbotNode(BaseNode, ChatHistoryNode):
    def __init__(self, history_manager):
        BaseNode.__init__(self, "chatbot")
        ChatHistoryNode.__init__(
            self,
            history_manager=history_manager,
            session_key="session_id"  # Context field for session ID
        )

    async def execute(self, node_context, workflow_context):
        # Get session ID from context
        session_id = workflow_context.get_global_variable("session_id")

        # Get history for this session
        history = await self.get_history(workflow_context)

        # Get all messages
        messages = await history.get_messages()

        # Get user input
        user_input = node_context.input_data["message"]
        user_msg = Message.user(user_input)

        # Generate response based on history
        response = await self.generate_response(messages + [user_msg])
        assistant_msg = Message.assistant(response)

        # Save interaction
        await self.save_interaction(
            workflow_context,
            user_msg,
            assistant_msg
        )

        return NodeResult(
            success=True,
            output={"response": response}
        )

Usage in Workflows

# Setup history manager
history_manager = SessionHistoryManager(
    history_factory=lambda sid: InMemoryChatHistory(),
    window_size=20
)

# Create chatbot node
chatbot = ChatbotNode(history_manager)

# Create workflow
workflow = InputNode() | chatbot | OutputNode()

# Execute with session
workflow_context = WorkflowContext(
    workflow_id="chat_workflow",
    global_variables={"session_id": "user_123"}
)

result = await workflow.run(
    {"message": "Hello!"},
    workflow_context
)

Memory Patterns

Pattern 1: Conversation Summarization

Periodically summarize old messages to save memory:
class SummarizingHistory(BaseChatHistory):
    def __init__(self, base_history, summarizer_node, max_messages=50):
        self.base = base_history
        self.summarizer = summarizer_node
        self.max_messages = max_messages
        self.summary = None

    async def add_message(self, message: Message):
        messages = await self.base.get_messages()

        if len(messages) >= self.max_messages:
            # Summarize old messages
            old_messages = messages[:25]
            self.summary = await self.summarizer.summarize(old_messages)

            # Keep recent messages + summary
            await self.base.clear()
            await self.base.add_message(
                Message.system(f"Previous conversation summary: {self.summary}")
            )
            for msg in messages[25:]:
                await self.base.add_message(msg)

        await self.base.add_message(message)

Pattern 2: Context-Aware Memory

Filter messages based on relevance:
class ContextAwareHistory:
    def __init__(self, base_history, embedder):
        self.base = base_history
        self.embedder = embedder

    async def get_relevant_messages(
        self,
        query: str,
        max_messages: int = 10
    ) -> list[Message]:
        """Get messages relevant to query"""
        all_messages = await self.base.get_messages()

        if len(all_messages) <= max_messages:
            return all_messages

        # Embed query and messages
        query_embedding = await self.embedder.embed(query)
        message_embeddings = [
            await self.embedder.embed(msg.content)
            for msg in all_messages
        ]

        # Calculate similarity scores
        scores = [
            cosine_similarity(query_embedding, msg_emb)
            for msg_emb in message_embeddings
        ]

        # Return top-k relevant messages
        ranked = sorted(
            zip(scores, all_messages),
            key=lambda x: x[0],
            reverse=True
        )
        return [msg for _, msg in ranked[:max_messages]]

Pattern 3: Multi-Modal Memory

Store and retrieve different content types:
class MultiModalHistory:
    def __init__(self):
        self.text_history = InMemoryChatHistory()
        self.image_cache = {}
        self.audio_cache = {}

    async def add_multi_modal_message(
        self,
        text: str = None,
        image_url: str = None,
        audio_data: bytes = None
    ):
        content = []

        if text:
            content.append({"type": "text", "text": text})

        if image_url:
            image_id = hashlib.md5(image_url.encode()).hexdigest()
            self.image_cache[image_id] = image_url
            content.append({
                "type": "image_ref",
                "image_id": image_id
            })

        if audio_data:
            audio_id = hashlib.md5(audio_data).hexdigest()
            self.audio_cache[audio_id] = audio_data
            content.append({
                "type": "audio_ref",
                "audio_id": audio_id
            })

        await self.text_history.add_message(
            Message.user(content)
        )

Performance Optimization

Lazy Loading

Load messages only when needed:
class LazyHistory:
    def __init__(self, storage_backend):
        self.storage = storage_backend
        self._messages = None
        self._loaded = False

    async def get_messages(self) -> list[Message]:
        if not self._loaded:
            self._messages = await self.storage.load()
            self._loaded = True
        return self._messages

    async def add_message(self, message: Message):
        # Ensure messages are loaded
        await self.get_messages()
        self._messages.append(message)
        # Batch write operations
        await self.storage.append(message)

Caching Strategies

from functools import lru_cache

class CachedHistory:
    def __init__(self, base_history):
        self.base = base_history

    @lru_cache(maxsize=100)
    async def get_cached_summary(self, session_id: str):
        """Cache conversation summaries"""
        messages = await self.base.get_messages()
        return self.summarize(messages)

    async def invalidate_cache(self, session_id: str):
        """Invalidate cache on updates"""
        self.get_cached_summary.cache_clear()

Best Practices

  • InMemory: Development, testing, single-instance
  • Redis: Production, distributed, multi-instance
  • Database: Compliance, long-term storage
# Sliding window for active conversations
SlidingWindowChatHistory(window_size=50)

# TTL for Redis storage
RedisChatHistory(ttl=3600)  # 1 hour

# Periodic cleanup for database
await cleanup_old_sessions(days=30)
try:
    history = await manager.get_history(session_id)
    # Use history
finally:
    # Clean up if needed
    if session_ended:
        await manager.clear_history(session_id)
# Track session count
active_sessions = len(manager.list_sessions())

# Monitor message count
for session_id in manager.list_sessions():
    history = await manager.get_history(session_id)
    messages = await history.get_messages()
    if len(messages) > threshold:
        # Summarize or archive
        pass

Complete Example

from nadoo_flow import (
    BaseNode, ChatHistoryNode, NodeResult,
    SessionHistoryManager, RedisChatHistory,
    Message, WorkflowContext
)
import redis.asyncio as redis

class SmartChatbot(BaseNode, ChatHistoryNode):
    """Chatbot with persistent memory"""

    def __init__(self, redis_client):
        BaseNode.__init__(self, "smart_chatbot")

        # Setup Redis-backed session manager
        manager = SessionHistoryManager(
            history_factory=lambda sid: RedisChatHistory(
                session_id=sid,
                redis_client=redis_client,
                ttl=3600  # 1 hour sessions
            ),
            window_size=50  # Keep last 50 messages
        )

        ChatHistoryNode.__init__(
            self,
            history_manager=manager,
            session_key="user_id"
        )

    async def execute(self, node_context, workflow_context):
        # Get user message
        user_input = node_context.input_data["message"]
        user_id = workflow_context.get_global_variable("user_id")

        # Get conversation history
        history = await self.get_history(workflow_context)
        messages = await history.get_messages()

        # Add system prompt if first message
        if not messages:
            system_msg = Message.system(
                "You are a helpful assistant with memory of past conversations."
            )
            await history.add_message(system_msg)
            messages = [system_msg]

        # Create user message
        user_msg = Message.user(user_input)

        # Generate response with context
        response = await self.generate_with_context(
            messages + [user_msg]
        )

        # Save interaction
        assistant_msg = Message.assistant(response)
        await self.save_interaction(
            workflow_context,
            user_msg,
            assistant_msg
        )

        return NodeResult(
            success=True,
            output={
                "response": response,
                "session_id": user_id,
                "message_count": len(messages) + 2
            }
        )

# Usage
async def main():
    # Setup Redis
    redis_client = redis.Redis(
        host='localhost',
        port=6379,
        decode_responses=True
    )

    # Create chatbot
    chatbot = SmartChatbot(redis_client)

    # Simulate conversation
    context = WorkflowContext(
        workflow_id="chat",
        global_variables={"user_id": "user_123"}
    )

    # First message
    result = await chatbot.execute(
        NodeContext(
            node_id="chat",
            input_data={"message": "Hi, I'm John"}
        ),
        context
    )

    # Follow-up (remembers previous)
    result = await chatbot.execute(
        NodeContext(
            node_id="chat",
            input_data={"message": "What's my name?"}
        ),
        context
    )
    # Response will know the name is John

# Run
asyncio.run(main())

See Also