Skip to main content

Overview

Flow Core provides comprehensive caching capabilities to improve performance, reduce API costs, and minimize latency by storing and reusing computation results.

Basic Caching

Simple Cache

from nadoo_flow.caching import SimpleCache

# Create cache instance
cache = SimpleCache(max_size=1000, ttl=3600)

# Store value
cache.set("user_123", {"name": "John", "age": 30})

# Retrieve value
user_data = cache.get("user_123")

# Check if key exists
if cache.has("user_123"):
    print("User found in cache")

# Clear cache
cache.clear()

CacheNode

Wrap any node with caching:
from nadoo_flow import CacheNode, BaseNode

class ExpensiveNode(BaseNode):
    async def execute(self, node_context, workflow_context):
        # Expensive computation
        result = await self.complex_calculation(node_context.input_data)
        return NodeResult(success=True, output=result)

# Wrap with caching
cached_node = CacheNode(
    node=ExpensiveNode(),
    cache_key_fn=lambda input_data: f"calc_{hash(str(input_data))}",
    ttl=3600
)

Cache Implementations

In-Memory Cache

from typing import Any, Optional
import time

class InMemoryCache:
    """Simple in-memory cache with TTL"""

    def __init__(self, max_size=1000, default_ttl=3600):
        self.cache = {}
        self.max_size = max_size
        self.default_ttl = default_ttl
        self.access_count = {}
        self.hit_count = 0
        self.miss_count = 0

    def set(self, key: str, value: Any, ttl: Optional[int] = None):
        """Store value in cache"""
        if len(self.cache) >= self.max_size:
            self._evict()

        self.cache[key] = {
            "value": value,
            "expires_at": time.time() + (ttl or self.default_ttl),
            "created_at": time.time()
        }

    def get(self, key: str) -> Optional[Any]:
        """Retrieve value from cache"""
        if key not in self.cache:
            self.miss_count += 1
            return None

        entry = self.cache[key]

        # Check expiration
        if time.time() > entry["expires_at"]:
            del self.cache[key]
            self.miss_count += 1
            return None

        self.hit_count += 1
        self.access_count[key] = self.access_count.get(key, 0) + 1
        return entry["value"]

    def _evict(self):
        """Evict least recently used items"""
        # Simple LRU eviction
        if not self.cache:
            return

        oldest_key = min(
            self.cache.keys(),
            key=lambda k: self.cache[k]["created_at"]
        )
        del self.cache[oldest_key]

    def get_stats(self):
        """Get cache statistics"""
        total_requests = self.hit_count + self.miss_count
        hit_rate = self.hit_count / total_requests if total_requests > 0 else 0

        return {
            "size": len(self.cache),
            "hit_count": self.hit_count,
            "miss_count": self.miss_count,
            "hit_rate": hit_rate,
            "most_accessed": sorted(
                self.access_count.items(),
                key=lambda x: x[1],
                reverse=True
            )[:10]
        }

Redis Cache

import json
import redis
from typing import Any, Optional

class RedisCache:
    """Redis-backed cache"""

    def __init__(self, redis_client=None, prefix="nadoo:", default_ttl=3600):
        self.client = redis_client or redis.Redis()
        self.prefix = prefix
        self.default_ttl = default_ttl

    async def set(self, key: str, value: Any, ttl: Optional[int] = None):
        """Store value in Redis"""
        full_key = f"{self.prefix}{key}"
        serialized = json.dumps(value)

        await self.client.setex(
            full_key,
            ttl or self.default_ttl,
            serialized
        )

    async def get(self, key: str) -> Optional[Any]:
        """Retrieve value from Redis"""
        full_key = f"{self.prefix}{key}"
        value = await self.client.get(full_key)

        if value:
            return json.loads(value)
        return None

    async def delete(self, key: str):
        """Delete key from cache"""
        full_key = f"{self.prefix}{key}"
        await self.client.delete(full_key)

    async def clear_pattern(self, pattern: str):
        """Clear all keys matching pattern"""
        cursor = 0
        while True:
            cursor, keys = await self.client.scan(
                cursor,
                match=f"{self.prefix}{pattern}*"
            )
            if keys:
                await self.client.delete(*keys)
            if cursor == 0:
                break

Advanced Caching Patterns

Multi-Level Cache

class MultiLevelCache:
    """Cache with multiple levels (L1, L2, etc.)"""

    def __init__(self):
        self.l1_cache = InMemoryCache(max_size=100, default_ttl=60)
        self.l2_cache = InMemoryCache(max_size=1000, default_ttl=3600)
        self.l3_cache = RedisCache()

    async def get(self, key: str) -> Optional[Any]:
        """Try each cache level"""
        # Check L1
        value = self.l1_cache.get(key)
        if value is not None:
            return value

        # Check L2
        value = self.l2_cache.get(key)
        if value is not None:
            # Promote to L1
            self.l1_cache.set(key, value)
            return value

        # Check L3
        value = await self.l3_cache.get(key)
        if value is not None:
            # Promote to L1 and L2
            self.l1_cache.set(key, value)
            self.l2_cache.set(key, value)
            return value

        return None

    async def set(self, key: str, value: Any):
        """Set in all cache levels"""
        self.l1_cache.set(key, value)
        self.l2_cache.set(key, value)
        await self.l3_cache.set(key, value)

Smart Cache Invalidation

class SmartCache:
    """Cache with intelligent invalidation"""

    def __init__(self):
        self.cache = InMemoryCache()
        self.dependencies = {}  # Track key dependencies

    def set(self, key: str, value: Any, depends_on: list = None):
        """Set with dependency tracking"""
        self.cache.set(key, value)

        if depends_on:
            for dep in depends_on:
                if dep not in self.dependencies:
                    self.dependencies[dep] = set()
                self.dependencies[dep].add(key)

    def invalidate(self, key: str, cascade=True):
        """Invalidate key and dependent keys"""
        # Invalidate the key
        self.cache.delete(key)

        if cascade and key in self.dependencies:
            # Invalidate all dependent keys
            for dependent_key in self.dependencies[key]:
                self.invalidate(dependent_key, cascade=True)

            del self.dependencies[key]

    def get(self, key: str) -> Optional[Any]:
        """Get with automatic validation"""
        value = self.cache.get(key)

        # Additional validation logic
        if value and self._is_stale(key):
            self.invalidate(key)
            return None

        return value

    def _is_stale(self, key: str) -> bool:
        """Check if cache entry is stale"""
        # Custom staleness logic
        return False

Caching Strategies

Cache-Aside Pattern

class CacheAsideNode(BaseNode):
    """Cache-aside (lazy loading) pattern"""

    def __init__(self, cache, data_source):
        super().__init__(node_id="cache_aside")
        self.cache = cache
        self.data_source = data_source

    async def execute(self, node_context, workflow_context):
        key = self._generate_key(node_context.input_data)

        # Try cache first
        cached_value = await self.cache.get(key)
        if cached_value:
            return NodeResult(
                success=True,
                output=cached_value,
                metadata={"cache_hit": True}
            )

        # Load from data source
        value = await self.data_source.fetch(node_context.input_data)

        # Store in cache
        await self.cache.set(key, value)

        return NodeResult(
            success=True,
            output=value,
            metadata={"cache_hit": False}
        )

Write-Through Cache

class WriteThroughCache(BaseNode):
    """Write-through caching pattern"""

    def __init__(self, cache, data_store):
        super().__init__(node_id="write_through")
        self.cache = cache
        self.data_store = data_store

    async def write(self, key: str, value: Any):
        """Write to both cache and store"""
        # Write to data store first
        await self.data_store.write(key, value)

        # Then update cache
        await self.cache.set(key, value)

    async def read(self, key: str) -> Optional[Any]:
        """Read from cache, fallback to store"""
        # Try cache
        value = await self.cache.get(key)
        if value:
            return value

        # Fallback to store
        value = await self.data_store.read(key)
        if value:
            # Update cache
            await self.cache.set(key, value)

        return value

Write-Behind Cache

import asyncio
from collections import deque

class WriteBehindCache:
    """Write-behind (write-back) caching"""

    def __init__(self, cache, data_store, flush_interval=5):
        self.cache = cache
        self.data_store = data_store
        self.flush_interval = flush_interval
        self.write_queue = deque()
        self._start_flush_task()

    async def write(self, key: str, value: Any):
        """Write to cache immediately, queue for store"""
        # Update cache immediately
        await self.cache.set(key, value)

        # Queue for eventual write
        self.write_queue.append((key, value))

    async def read(self, key: str) -> Optional[Any]:
        """Read from cache"""
        return await self.cache.get(key)

    def _start_flush_task(self):
        """Start background flush task"""
        asyncio.create_task(self._flush_loop())

    async def _flush_loop(self):
        """Periodically flush writes to store"""
        while True:
            await asyncio.sleep(self.flush_interval)
            await self._flush()

    async def _flush(self):
        """Flush pending writes"""
        batch = []
        while self.write_queue and len(batch) < 100:
            batch.append(self.write_queue.popleft())

        if batch:
            # Batch write to store
            await self.data_store.batch_write(batch)

Cache Warming

Proactive Cache Warming

class CacheWarmer:
    """Warm cache proactively"""

    def __init__(self, cache, data_source):
        self.cache = cache
        self.data_source = data_source

    async def warm_cache(self, keys: list):
        """Pre-populate cache with frequently used data"""
        tasks = []
        for key in keys:
            task = self._warm_single(key)
            tasks.append(task)

        await asyncio.gather(*tasks)

    async def _warm_single(self, key: str):
        """Warm single cache entry"""
        try:
            # Check if already cached
            if await self.cache.get(key):
                return

            # Fetch and cache
            value = await self.data_source.fetch(key)
            await self.cache.set(key, value)

        except Exception as e:
            logger.error(f"Failed to warm cache for {key}: {e}")

    async def warm_from_analytics(self):
        """Warm cache based on usage analytics"""
        # Get most accessed keys
        popular_keys = await self.get_popular_keys()

        # Warm cache with popular data
        await self.warm_cache(popular_keys)

Cache Monitoring

Cache Metrics

class CacheMetrics:
    """Monitor cache performance"""

    def __init__(self):
        self.metrics = {
            "hits": 0,
            "misses": 0,
            "sets": 0,
            "deletes": 0,
            "evictions": 0,
            "response_times": []
        }

    def record_hit(self):
        """Record cache hit"""
        self.metrics["hits"] += 1

    def record_miss(self):
        """Record cache miss"""
        self.metrics["misses"] += 1

    def record_response_time(self, duration: float):
        """Record response time"""
        self.metrics["response_times"].append(duration)

        # Keep only recent times
        if len(self.metrics["response_times"]) > 1000:
            self.metrics["response_times"] = self.metrics["response_times"][-1000:]

    def get_statistics(self):
        """Get cache statistics"""
        total_requests = self.metrics["hits"] + self.metrics["misses"]
        hit_ratio = self.metrics["hits"] / total_requests if total_requests > 0 else 0

        response_times = self.metrics["response_times"]
        avg_response = sum(response_times) / len(response_times) if response_times else 0

        return {
            "hit_ratio": hit_ratio,
            "total_hits": self.metrics["hits"],
            "total_misses": self.metrics["misses"],
            "avg_response_time": avg_response,
            "cache_efficiency": self._calculate_efficiency()
        }

    def _calculate_efficiency(self):
        """Calculate cache efficiency score"""
        hit_ratio = self.metrics["hits"] / (self.metrics["hits"] + self.metrics["misses"] + 0.001)
        eviction_ratio = self.metrics["evictions"] / (self.metrics["sets"] + 0.001)

        # High hit ratio and low eviction ratio = good efficiency
        efficiency = hit_ratio * (1 - eviction_ratio)
        return efficiency

Cached Workflow Node

Complete Cached Node Implementation

class CachedWorkflowNode(BaseNode):
    """Full-featured cached node"""

    def __init__(
        self,
        wrapped_node,
        cache_config=None,
        key_generator=None
    ):
        super().__init__(node_id=f"cached_{wrapped_node.node_id}")
        self.wrapped_node = wrapped_node
        self.cache = self._create_cache(cache_config or {})
        self.key_generator = key_generator or self._default_key_generator
        self.metrics = CacheMetrics()

    def _create_cache(self, config):
        """Create cache based on config"""
        cache_type = config.get("type", "memory")

        if cache_type == "memory":
            return InMemoryCache(**config.get("params", {}))
        elif cache_type == "redis":
            return RedisCache(**config.get("params", {}))
        elif cache_type == "multi":
            return MultiLevelCache()
        else:
            raise ValueError(f"Unknown cache type: {cache_type}")

    def _default_key_generator(self, input_data):
        """Generate cache key from input"""
        import hashlib
        import json

        # Sort keys for consistent hashing
        sorted_data = json.dumps(input_data, sort_keys=True)
        return hashlib.sha256(sorted_data.encode()).hexdigest()

    async def execute(self, node_context, workflow_context):
        # Generate cache key
        cache_key = self.key_generator(node_context.input_data)

        # Try cache
        start_time = time.time()
        cached_result = await self.cache.get(cache_key)

        if cached_result:
            self.metrics.record_hit()
            self.metrics.record_response_time(time.time() - start_time)

            return NodeResult(
                success=True,
                output=cached_result,
                metadata={
                    "cache_hit": True,
                    "cache_key": cache_key,
                    "response_time": time.time() - start_time
                }
            )

        # Cache miss - execute node
        self.metrics.record_miss()
        result = await self.wrapped_node.execute(node_context, workflow_context)

        # Cache successful results
        if result.success:
            await self.cache.set(cache_key, result.output)

        self.metrics.record_response_time(time.time() - start_time)

        return NodeResult(
            success=result.success,
            output=result.output,
            error=result.error,
            metadata={
                "cache_hit": False,
                "cache_key": cache_key,
                "response_time": time.time() - start_time
            }
        )

    def get_cache_stats(self):
        """Get cache statistics"""
        return self.metrics.get_statistics()

Best Practices

Set TTL based on data volatility:
  • Static data: Long TTL (hours/days)
  • Dynamic data: Short TTL (minutes)
  • Real-time data: Very short TTL (seconds)
Pre-populate cache with frequently accessed data during low-traffic periods.
Track hit ratios, response times, and eviction rates to optimize cache configuration.
Use locks or probabilistic early expiration to prevent cache stampede.
For distributed caches, use consistent hashing for better key distribution.

Next Steps