Skip to main content

Overview

The caching module provides LLM response caching to reduce API costs and improve latency. It supports in-memory and Redis-based caching with TTL expiration.

Classes

CacheEntry

Data structure for cached values.
from nadoo_flow import CacheEntry

entry = CacheEntry(
    key="cache_key",
    value={"response": "Hello"},
    created_at=1234567890.0,
    ttl=3600,
    metadata={"model": "gpt-4"}
)

if entry.is_expired():
    print("Entry expired")

Attributes

NameTypeDescription
keystrCache key
valueAnyCached value
created_atfloatCreation timestamp (Unix time)
ttlfloat | NoneTime-to-live in seconds (None = no expiration)
metadatadict[str, Any] | NoneOptional metadata

Methods

is_expired
Check if entry has expired.
def is_expired() -> bool
Returns:
  • bool - True if expired, False otherwise

BaseCache

Abstract base class for cache implementations.
from nadoo_flow.caching import BaseCache

class MyCache(BaseCache):
    def get(self, key: str) -> Any | None:
        # Implementation
        pass

    def set(self, key: str, value: Any, ttl: float | None = None):
        # Implementation
        pass

    def delete(self, key: str):
        # Implementation
        pass

    def clear(self):
        # Implementation
        pass

    def exists(self, key: str) -> bool:
        # Implementation
        pass

Abstract Methods

All cache implementations must implement these methods:
MethodDescriptionParametersReturns
get(key)Retrieve value from cachekey: strAny | None
set(key, value, ttl)Store value in cachekey: str, value: Any, ttl: float | NoneNone
delete(key)Remove value from cachekey: strNone
clear()Clear all cache entriesNoneNone
exists(key)Check if key existskey: strbool

InMemoryCache

In-memory cache implementation.
from nadoo_flow import InMemoryCache

# Create cache with 1-hour TTL
cache = InMemoryCache(default_ttl=3600)

# Store value
cache.set("key", "value")

# Retrieve value
value = cache.get("key")  # Returns "value"

# Check existence
if cache.exists("key"):
    print("Key exists")

# Delete value
cache.delete("key")

# Clear all
cache.clear()

Constructor

def __init__(default_ttl: float | None = None)
Parameters:
  • default_ttl (float | None) - Default TTL in seconds. If None, entries never expire.

Methods

get
Retrieve value from cache. Automatically removes expired entries.
def get(key: str) -> Any | None
set
Store value in cache.
def set(key: str, value: Any, ttl: float | None = None)
Parameters:
  • ttl - Time-to-live in seconds. If None, uses default_ttl.
delete
Remove entry from cache.
def delete(key: str)
clear
Remove all entries from cache.
def clear()
exists
Check if key exists (and is not expired).
def exists(key: str) -> bool
cleanup_expired
Manually remove all expired entries.
def cleanup_expired()
Usage:
cache = InMemoryCache()
# ... cache operations ...
cache.cleanup_expired()  # Cleanup expired entries

ResponseCache

LLM response caching with automatic key generation.
from nadoo_flow import ResponseCache, InMemoryCache

# Create response cache
cache = ResponseCache(
    cache=InMemoryCache(default_ttl=3600),
    namespace="nadoo_llm",
    include_model_params=True
)

# Generate cache key
cache_key = cache.make_key(
    prompt="What is AI?",
    model="gpt-4",
    temperature=0.7
)

# Check cache
cached_response = cache.get(cache_key)
if cached_response:
    print("Cache hit!")
    return cached_response

# Call LLM
response = call_llm(...)

# Store in cache
cache.set(cache_key, response, ttl=3600)

Constructor

def __init__(
    cache: BaseCache,
    namespace: str = "nadoo_llm",
    include_model_params: bool = True
)
Parameters:
  • cache - Backend cache implementation (InMemoryCache, RedisCache, etc.)
  • namespace - Cache key namespace prefix
  • include_model_params - Include model parameters (temperature, etc.) in cache key

Methods

make_key
Generate cache key from prompt and parameters.
def make_key(
    prompt: str | list[dict[str, Any]],
    model: str | None = None,
    **kwargs
) -> str
Parameters:
  • prompt - String prompt or message list
  • model - Model name (e.g., “gpt-4”)
  • **kwargs - Model parameters (temperature, max_tokens, top_p, etc.)
Returns:
  • str - Cache key in format: namespace:prompt_hash:model:params_hash
Included Parameters: Only deterministic parameters are included in the cache key:
  • temperature
  • max_tokens
  • top_p
  • frequency_penalty
  • presence_penalty
Example:
# String prompt
key1 = cache.make_key(
    prompt="Translate to Spanish: Hello",
    model="gpt-4",
    temperature=0.7
)

# Message list
key2 = cache.make_key(
    prompt=[
        {"role": "system", "content": "You are a translator"},
        {"role": "user", "content": "Translate: Hello"}
    ],
    model="gpt-4"
)
get
Retrieve cached response.
def get(key: str) -> Any | None
Logs cache hit/miss at DEBUG level.
set
Store response in cache.
def set(key: str, value: Any, ttl: float | None = None)
delete
Remove specific cache entry.
def delete(key: str)
clear
Clear all entries in this namespace.
def clear()
For InMemoryCache, only clears entries matching the namespace. For other caches, clears everything.

CachedNode

Mixin class to add caching capability to nodes.
from nadoo_flow import BaseNode, CachedNode, ResponseCache, InMemoryCache, NodeResult

class MyLLMNode(BaseNode, CachedNode):
    def __init__(self):
        BaseNode.__init__(
            self,
            node_id="llm",
            node_type="llm",
            name="LLM Node",
            config={}
        )
        CachedNode.__init__(
            self,
            response_cache=ResponseCache(InMemoryCache(default_ttl=3600))
        )

    async def execute(self, node_context, workflow_context):
        prompt = node_context.get_input("prompt")

        # Generate cache key
        cache_key = self.response_cache.make_key(
            prompt=prompt,
            model="gpt-4"
        )

        # Check cache
        if self.is_cache_enabled():
            cached = self.response_cache.get(cache_key)
            if cached:
                return NodeResult(
                    success=True,
                    output=cached,
                    metadata={"cache_hit": True}
                )

        # Call LLM
        response = await self._call_llm(prompt)

        # Store in cache
        if self.is_cache_enabled():
            self.response_cache.set(cache_key, response)

        return NodeResult(
            success=True,
            output=response,
            metadata={"cache_hit": False}
        )

Constructor

def __init__(response_cache: ResponseCache)
Parameters:
  • response_cache - ResponseCache instance to use

Methods

enable_cache
Enable caching for this node.
def enable_cache()
disable_cache
Disable caching for this node.
def disable_cache()
is_cache_enabled
Check if caching is enabled.
def is_cache_enabled() -> bool
clear_cache
Clear all cache entries for this node.
def clear_cache()

RedisCache

Redis-based distributed cache (requires redis package).
from nadoo_flow.caching import RedisCache

# Create Redis cache
cache = RedisCache(
    host="localhost",
    port=6379,
    db=0,
    password="secret",
    default_ttl=3600,
    prefix="nadoo:"
)

# Use like any other cache
cache.set("key", {"data": "value"})
value = cache.get("key")

Constructor

def __init__(
    host: str = "localhost",
    port: int = 6379,
    db: int = 0,
    password: str | None = None,
    default_ttl: float | None = None,
    prefix: str = "nadoo:"
)
Parameters:
  • host - Redis server hostname
  • port - Redis server port
  • db - Redis database number (0-15)
  • password - Redis password (if required)
  • default_ttl - Default TTL in seconds
  • prefix - Key prefix for namespacing

Features

  • JSON Serialization: Automatically serializes/deserializes JSON
  • TTL Support: Automatic expiration with SETEX
  • Prefix Support: Namespace isolation with key prefix
  • Distributed: Share cache across multiple processes/servers

Methods

Same as BaseCache: get, set, delete, clear, exists

Usage Patterns

Basic LLM Caching

from nadoo_flow import ResponseCache, InMemoryCache

# Setup
cache = ResponseCache(InMemoryCache(default_ttl=3600))

async def call_llm_with_cache(prompt: str, model: str):
    # Generate key
    key = cache.make_key(prompt=prompt, model=model)

    # Check cache
    cached = cache.get(key)
    if cached:
        print("Cache hit! Saved API call")
        return cached

    # Call LLM
    response = await llm_api_call(prompt, model)

    # Cache result
    cache.set(key, response)

    return response

# Use
result = await call_llm_with_cache("What is AI?", "gpt-4")

Per-Temperature Caching

# Different temperatures = different cache keys
key_temp_0 = cache.make_key(
    prompt="Creative story",
    model="gpt-4",
    temperature=0.0  # Deterministic
)

key_temp_1 = cache.make_key(
    prompt="Creative story",
    model="gpt-4",
    temperature=1.0  # Creative
)

# These will be cached separately

Disable Parameter Caching

# Ignore temperature in cache key
cache = ResponseCache(
    cache=InMemoryCache(),
    include_model_params=False  # Don't include params
)

# Now these have same key (only prompt + model)
key1 = cache.make_key(prompt="Hello", model="gpt-4", temperature=0.0)
key2 = cache.make_key(prompt="Hello", model="gpt-4", temperature=1.0)
# key1 == key2

Distributed Caching with Redis

from nadoo_flow import ResponseCache
from nadoo_flow.caching import RedisCache

# Create Redis cache
redis_cache = RedisCache(
    host="redis.example.com",
    port=6379,
    password="secret",
    default_ttl=7200  # 2 hours
)

# Use in response cache
cache = ResponseCache(
    cache=redis_cache,
    namespace="production_llm"
)

# Now multiple servers share same cache

Conditional Caching

class SmartCachedNode(BaseNode, CachedNode):
    def __init__(self):
        BaseNode.__init__(self, node_id="smart", node_type="llm", name="Smart", config={})
        CachedNode.__init__(self, response_cache=ResponseCache(InMemoryCache()))

    async def execute(self, node_context, workflow_context):
        prompt = node_context.get_input("prompt")
        force_fresh = node_context.get_input("force_fresh", False)

        # Disable cache if force_fresh
        if force_fresh:
            self.disable_cache()
        else:
            self.enable_cache()

        # ... rest of execution ...

Cache Warming

# Pre-populate cache with common queries
common_queries = [
    "What is machine learning?",
    "Explain neural networks",
    "What is deep learning?"
]

for query in common_queries:
    key = cache.make_key(prompt=query, model="gpt-4")
    if not cache.get(key):
        response = await call_llm(query)
        cache.set(key, response, ttl=86400)  # 24 hours

print("Cache warmed!")

Cache Statistics

class CacheStatsNode(BaseNode, CachedNode):
    def __init__(self):
        BaseNode.__init__(self, node_id="stats", node_type="llm", name="Stats", config={})
        CachedNode.__init__(self, response_cache=ResponseCache(InMemoryCache()))

        self.cache_hits = 0
        self.cache_misses = 0

    async def execute(self, node_context, workflow_context):
        cache_key = self.response_cache.make_key(...)

        cached = self.response_cache.get(cache_key)
        if cached:
            self.cache_hits += 1
            return NodeResult(success=True, output=cached)
        else:
            self.cache_misses += 1
            # ... call LLM ...

    def get_hit_rate(self) -> float:
        total = self.cache_hits + self.cache_misses
        return self.cache_hits / total if total > 0 else 0.0

TTL Strategies

# Short TTL for dynamic content
news_cache = ResponseCache(InMemoryCache(default_ttl=300))  # 5 minutes

# Long TTL for static content
docs_cache = ResponseCache(InMemoryCache(default_ttl=86400))  # 24 hours

# No expiration for expensive operations
expensive_cache = ResponseCache(InMemoryCache(default_ttl=None))  # Never expire

# Per-request TTL
cache.set(key, value, ttl=60)  # This specific entry expires in 1 minute

Best Practices

Set TTL based on content freshness requirements:
# Dynamic data: short TTL
InMemoryCache(default_ttl=300)  # 5 minutes

# Semi-static: medium TTL
InMemoryCache(default_ttl=3600)  # 1 hour

# Static: long TTL
InMemoryCache(default_ttl=86400)  # 24 hours

# Computation results: no expiration
InMemoryCache(default_ttl=None)  # Never expire
  • InMemoryCache: Single process, fast, simple
  • RedisCache: Multi-process/server, distributed, persistent
# Single server
cache = InMemoryCache()

# Distributed/production
cache = RedisCache(host="redis-server")
Only cache based on parameters that affect output:
# Good: Deterministic parameters
cache.make_key(
    prompt="Hello",
    temperature=0.7,
    max_tokens=100
)

# Bad: Including random seed defeats caching
# Don't include: seed, user_id, timestamp
Track hit rates to optimize TTL and size:
hits = 0
misses = 0

result = cache.get(key)
if result:
    hits += 1
else:
    misses += 1

hit_rate = hits / (hits + misses)
if hit_rate < 0.3:
    print("Low hit rate, consider longer TTL")
Clear cache when underlying data changes:
# Clear specific entries
cache.delete(key)

# Clear namespace
response_cache.clear()

# Clear everything
base_cache.clear()
Cache should enhance, not break functionality:
try:
    cached = cache.get(key)
    if cached:
        return cached
except Exception as e:
    logger.warning(f"Cache error: {e}")
    # Continue without cache

# Proceed with normal execution
result = await call_llm(...)

try:
    cache.set(key, result)
except Exception as e:
    logger.warning(f"Cache set failed: {e}")
    # Still return result
Periodically cleanup to prevent memory bloat:
import asyncio

async def cleanup_task():
    while True:
        await asyncio.sleep(3600)  # Every hour
        if isinstance(cache, InMemoryCache):
            cache.cleanup_expired()

# Start cleanup task
asyncio.create_task(cleanup_task())

Cache Key Design

Key Format

namespace:prompt_hash:model:params_hash
Example:
nadoo_llm:a3f5b2c1d4e6f7g8:gpt-4:1a2b3c4d

Hash Collision

SHA-256 hashes are truncated to 16 characters. Collision probability is negligible for practical use, but for absolute safety, store full prompt in metadata.

Custom Namespaces

# Separate caches for different purposes
training_cache = ResponseCache(cache, namespace="training")
production_cache = ResponseCache(cache, namespace="production")
test_cache = ResponseCache(cache, namespace="test")

# Keys don't collide
training_cache.set(key, value1)
production_cache.set(key, value2)  # Different namespace

See Also