Skip to main content

Overview

The memory_db module provides database-backed persistent storage for chat history with Redis caching. It implements a dual-layer storage pattern:
  • Redis: Fast cache with TTL-based expiration
  • Database: Permanent storage for searchability and analytics
Architecture:
Write: Both Redis + DB
Read: Redis first (cache hit) → DB fallback (cache miss)

Protocols

DatabaseAdapter

Protocol defining the interface for database adapters.
from typing import Protocol
from nadoo_flow import Message

class DatabaseAdapter(Protocol):
    async def save_message(
        self,
        session_id: str,
        message: Message,
        workspace_id: str | None = None
    ) -> None:
        """Save a single message to DB"""
        ...

    async def save_messages(
        self,
        session_id: str,
        messages: list[Message],
        workspace_id: str | None = None
    ) -> None:
        """Save multiple messages to DB"""
        ...

    async def get_messages(
        self,
        session_id: str,
        workspace_id: str | None = None,
        limit: int | None = None
    ) -> list[Message]:
        """Get messages from DB"""
        ...

    async def clear_messages(
        self,
        session_id: str,
        workspace_id: str | None = None
    ) -> None:
        """Clear all messages for a session"""
        ...

Implementing DatabaseAdapter

Any ORM (SQLAlchemy, Django, etc.) can implement this protocol: SQLAlchemy Example:
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from nadoo_flow.memory_db import DatabaseAdapter
from nadoo_flow import Message

class SQLAlchemyAdapter(DatabaseAdapter):
    def __init__(self, db: AsyncSession):
        self.db = db

    async def save_message(
        self,
        session_id: str,
        message: Message,
        workspace_id: str | None = None
    ):
        chat_message = ChatMessage(
            session_id=session_id,
            workspace_id=workspace_id,
            role=message.role,
            content=message.content,
            metadata=message.metadata,
            timestamp=message.timestamp
        )
        self.db.add(chat_message)
        await self.db.commit()

    async def save_messages(
        self,
        session_id: str,
        messages: list[Message],
        workspace_id: str | None = None
    ):
        chat_messages = [
            ChatMessage(
                session_id=session_id,
                workspace_id=workspace_id,
                role=msg.role,
                content=msg.content,
                metadata=msg.metadata,
                timestamp=msg.timestamp
            )
            for msg in messages
        ]
        self.db.add_all(chat_messages)
        await self.db.commit()

    async def get_messages(
        self,
        session_id: str,
        workspace_id: str | None = None,
        limit: int | None = None
    ) -> list[Message]:
        query = select(ChatMessage).where(
            ChatMessage.session_id == session_id
        )

        if workspace_id:
            query = query.where(ChatMessage.workspace_id == workspace_id)

        if limit:
            query = query.limit(limit)

        query = query.order_by(ChatMessage.timestamp)

        result = await self.db.execute(query)
        messages = result.scalars().all()

        return [
            Message(
                role=msg.role,
                content=msg.content,
                metadata=msg.metadata,
                timestamp=msg.timestamp
            )
            for msg in messages
        ]

    async def clear_messages(
        self,
        session_id: str,
        workspace_id: str | None = None
    ):
        query = delete(ChatMessage).where(
            ChatMessage.session_id == session_id
        )

        if workspace_id:
            query = query.where(ChatMessage.workspace_id == workspace_id)

        await self.db.execute(query)
        await self.db.commit()
Django ORM Example:
from nadoo_flow.memory_db import DatabaseAdapter
from nadoo_flow import Message
from asgiref.sync import sync_to_async

class DjangoAdapter(DatabaseAdapter):
    async def save_message(self, session_id, message, workspace_id=None):
        await sync_to_async(ChatMessage.objects.create)(
            session_id=session_id,
            workspace_id=workspace_id,
            role=message.role,
            content=message.content,
            metadata=message.metadata,
            timestamp=message.timestamp
        )

    async def get_messages(self, session_id, workspace_id=None, limit=None):
        qs = ChatMessage.objects.filter(session_id=session_id)

        if workspace_id:
            qs = qs.filter(workspace_id=workspace_id)

        if limit:
            qs = qs[:limit]

        messages = await sync_to_async(list)(qs)

        return [
            Message(
                role=msg.role,
                content=msg.content,
                metadata=msg.metadata,
                timestamp=msg.timestamp
            )
            for msg in messages
        ]

    # Implement save_messages and clear_messages...

Classes

DatabaseChatHistory

Database-backed chat history with Redis caching layer.
from nadoo_flow.memory_db import DatabaseChatHistory, SQLAlchemyAdapter
import redis

# Create DB adapter
db_adapter = SQLAlchemyAdapter(db=db_session)

# Create history with Redis caching
history = DatabaseChatHistory(
    session_id="user_123",
    db_adapter=db_adapter,
    redis_client=redis.Redis(host="localhost", port=6379),
    workspace_id="workspace_abc",
    redis_ttl=3600  # 1 hour cache
)

# Add messages (saved to both DB and Redis)
await history.add_message(Message(role="user", content="Hello"))
await history.add_message(Message(role="assistant", content="Hi!"))

# Get messages (from Redis cache if available, otherwise DB)
messages = await history.get_messages()

# Clear history (from both DB and Redis)
await history.clear()

Constructor

DatabaseChatHistory(
    session_id: str,
    db_adapter: DatabaseAdapter,
    redis_client: Any | None = None,
    workspace_id: str | None = None,
    redis_key_prefix: str = "chat_history:",
    redis_ttl: int | None = 3600
)
Parameters:
NameTypeDefaultDescription
session_idstr-Session identifier
db_adapterDatabaseAdapter-Database adapter implementing the protocol
redis_clientAny | NoneNoneRedis client (if None, DB-only mode)
workspace_idstr | NoneNoneWorkspace ID for multi-tenancy
redis_key_prefixstr"chat_history:"Redis key prefix
redis_ttlint | None3600Redis TTL in seconds (None = no expiration)

Methods

get_messages
Get messages with Redis → DB fallback.
async def get_messages() -> list[Message]
Flow:
  1. Try Redis cache (fast)
  2. If cache miss, query DB
  3. Cache DB results to Redis
  4. Return messages
Returns:
  • list[Message] - List of messages in chronological order
Example:
messages = await history.get_messages()

for msg in messages:
    print(f"{msg.role}: {msg.content}")
add_message
Add single message (DB + Redis).
async def add_message(message: Message)
Flow:
  1. Save to DB (permanent)
  2. Append to Redis cache (fast access)
  3. Set TTL on Redis key
Parameters:
  • message - Message to add
Example:
from nadoo_flow import Message

await history.add_message(
    Message(
        role="user",
        content="What is AI?",
        metadata={"source": "web"}
    )
)
add_messages
Add multiple messages in batch.
async def add_messages(messages: list[Message])
Flow:
  1. Batch save to DB
  2. Batch cache to Redis
Parameters:
  • messages - List of messages to add
Example:
messages = [
    Message(role="user", content="Question 1"),
    Message(role="assistant", content="Answer 1"),
    Message(role="user", content="Question 2"),
    Message(role="assistant", content="Answer 2")
]

await history.add_messages(messages)
clear
Clear all messages for session (DB + Redis).
async def clear()
Flow:
  1. Delete from DB
  2. Delete from Redis cache
Example:
await history.clear()

Factory Functions

create_database_history_manager

Create SessionHistoryManager with database backend.
from nadoo_flow.memory_db import create_database_history_manager

# Create manager
history_manager = create_database_history_manager(
    db_adapter=db_adapter,
    redis_client=redis_client,
    redis_ttl=3600,
    default_workspace_id="workspace_123"
)

# Use in workflow
history = history_manager.get_history(session_id="user_456")
await history.add_message(Message(role="user", content="Hello"))

Signature

def create_database_history_manager(
    db_adapter: DatabaseAdapter,
    redis_client: Any | None = None,
    redis_ttl: int = 3600,
    default_workspace_id: str | None = None
) -> SessionHistoryManager
Parameters:
  • db_adapter - Database adapter instance
  • redis_client - Redis client (optional)
  • redis_ttl - Default Redis TTL
  • default_workspace_id - Default workspace ID
Returns:
  • SessionHistoryManager - Manager instance

Usage Patterns

Basic Usage (DB Only)

from nadoo_flow.memory_db import DatabaseChatHistory
from nadoo_flow import Message

# No Redis, pure DB
history = DatabaseChatHistory(
    session_id="session_123",
    db_adapter=db_adapter,
    redis_client=None  # DB only
)

# Add and retrieve
await history.add_message(Message(role="user", content="Hi"))
messages = await history.get_messages()

With Redis Caching

import redis

# With Redis for performance
history = DatabaseChatHistory(
    session_id="session_123",
    db_adapter=db_adapter,
    redis_client=redis.Redis(host="localhost"),
    redis_ttl=3600  # 1 hour cache
)

# First call: DB query + Redis cache
messages1 = await history.get_messages()  # 📦 Redis MISS → 💾 DB query → 📦 Cache

# Second call: Redis cache hit (fast!)
messages2 = await history.get_messages()  # 📦 Redis HIT (instant)

Multi-Tenancy

# Different workspaces, isolated data
workspace_a_history = DatabaseChatHistory(
    session_id="user_123",
    db_adapter=db_adapter,
    redis_client=redis_client,
    workspace_id="workspace_a"
)

workspace_b_history = DatabaseChatHistory(
    session_id="user_123",  # Same user
    db_adapter=db_adapter,
    redis_client=redis_client,
    workspace_id="workspace_b"  # Different workspace
)

# Data is isolated by workspace
await workspace_a_history.add_message(Message(role="user", content="A"))
await workspace_b_history.add_message(Message(role="user", content="B"))

a_messages = await workspace_a_history.get_messages()  # Only "A"
b_messages = await workspace_b_history.get_messages()  # Only "B"

Batch Operations

# Efficient batch insert
conversation = [
    Message(role="user", content="Question 1"),
    Message(role="assistant", content="Answer 1"),
    Message(role="user", content="Question 2"),
    Message(role="assistant", content="Answer 2"),
    Message(role="user", content="Question 3"),
    Message(role="assistant", content="Answer 3")
]

await history.add_messages(conversation)  # Single DB transaction

Session Management

from nadoo_flow.memory_db import create_database_history_manager

# Create manager
manager = create_database_history_manager(
    db_adapter=db_adapter,
    redis_client=redis_client,
    redis_ttl=7200,  # 2 hours
    default_workspace_id="workspace_123"
)

# Manage multiple sessions
session_1 = manager.get_history("user_1")
session_2 = manager.get_history("user_2")
session_3 = manager.get_history("user_3")

# Each session has isolated history
await session_1.add_message(Message(role="user", content="User 1 message"))
await session_2.add_message(Message(role="user", content="User 2 message"))

Custom TTL per Session

# Short-lived cache for active users
active_user_history = DatabaseChatHistory(
    session_id="active_user",
    db_adapter=db_adapter,
    redis_client=redis_client,
    redis_ttl=300  # 5 minutes
)

# Long-lived cache for VIP users
vip_user_history = DatabaseChatHistory(
    session_id="vip_user",
    db_adapter=db_adapter,
    redis_client=redis_client,
    redis_ttl=86400  # 24 hours
)

# No cache for archived data
archive_history = DatabaseChatHistory(
    session_id="archived",
    db_adapter=db_adapter,
    redis_client=None  # No cache
)

Database Schema Example

SQLAlchemy Model:
from sqlalchemy import Column, String, Text, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class ChatMessage(Base):
    __tablename__ = "chat_messages"

    id = Column(String, primary_key=True)
    session_id = Column(String, index=True, nullable=False)
    workspace_id = Column(String, index=True, nullable=True)
    role = Column(String, nullable=False)  # "user" or "assistant"
    content = Column(Text, nullable=False)
    metadata = Column(JSON, nullable=True)
    timestamp = Column(DateTime, nullable=False)

    __table_args__ = (
        Index("idx_session_workspace", "session_id", "workspace_id"),
    )

Best Practices

Match TTL to usage patterns:
# Active chat sessions: short TTL
DatabaseChatHistory(redis_ttl=300)  # 5 minutes

# Recent conversations: medium TTL
DatabaseChatHistory(redis_ttl=3600)  # 1 hour

# Historical data: no cache
DatabaseChatHistory(redis_ttl=None, redis_client=None)  # DB only
Reduce DB round-trips with batch inserts:
# Bad: Multiple DB transactions
for msg in messages:
    await history.add_message(msg)  # N queries

# Good: Single DB transaction
await history.add_messages(messages)  # 1 query
The system automatically falls back to DB:
# If Redis is down:
history = DatabaseChatHistory(
    session_id="session",
    db_adapter=db_adapter,
    redis_client=redis_client  # May be unavailable
)

# Still works, just slower (no cache)
messages = await history.get_messages()  # Falls back to DB
Always use workspace_id for multi-tenant apps:
# Good: Isolated by workspace
history = DatabaseChatHistory(
    session_id="user_123",
    db_adapter=db_adapter,
    workspace_id=current_workspace.id  # Required!
)

# Bad: No isolation
history = DatabaseChatHistory(
    session_id="user_123",
    db_adapter=db_adapter,
    workspace_id=None  # Data leaks possible
)
Create indexes for fast queries:
-- Essential indexes
CREATE INDEX idx_session ON chat_messages(session_id);
CREATE INDEX idx_workspace ON chat_messages(workspace_id);
CREATE INDEX idx_session_workspace ON chat_messages(session_id, workspace_id);
CREATE INDEX idx_timestamp ON chat_messages(timestamp);
Implement data retention policies:
# Delete messages older than 90 days
async def cleanup_old_messages(db_adapter: DatabaseAdapter):
    cutoff_date = datetime.now() - timedelta(days=90)

    # Implement in your adapter
    await db_adapter.delete_messages_before(cutoff_date)
Track Redis cache performance:
class MonitoredDatabaseChatHistory(DatabaseChatHistory):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.cache_hits = 0
        self.cache_misses = 0

    async def get_messages(self):
        # Check Redis first
        if self.redis:
            cached = self.redis.lrange(self._make_redis_key(), 0, -1)
            if cached:
                self.cache_hits += 1
            else:
                self.cache_misses += 1

        return await super().get_messages()

    def get_hit_rate(self):
        total = self.cache_hits + self.cache_misses
        return self.cache_hits / total if total > 0 else 0.0

Redis Key Format

Key Structure

{redis_key_prefix}{workspace_id}:{session_id}
Examples:
# Without workspace
"chat_history:user_123"

# With workspace
"chat_history:workspace_abc:user_123"

Key Management

# Custom prefix
history = DatabaseChatHistory(
    session_id="user",
    db_adapter=adapter,
    redis_client=redis,
    redis_key_prefix="myapp:chat:"
)

# Resulting key: "myapp:chat:user"

Performance Characteristics

Read Performance

ScenarioPerformance
Redis cache hit~1ms (instant)
Redis cache miss → DB~10-100ms (depends on DB)
DB only (no Redis)~10-100ms

Write Performance

OperationPerformance
Single message~10-50ms (DB write)
Batch messages~20-100ms (depends on batch size)
Redis cache~1ms (async, non-blocking)

Recommendations

  • High traffic: Use Redis with TTL
  • Historical data: DB only (no cache)
  • Real-time chat: Short TTL (5-15 minutes)
  • Async analysis: Long TTL or no cache

See Also

  • Memory - In-memory and Redis-only chat history
  • Caching - General caching mechanisms
  • Backends - Backend integrations