Overview
Flow Core provides comprehensive caching capabilities to improve performance, reduce API costs, and minimize latency by storing and reusing computation results.Basic Caching
Simple Cache
Copy
from nadoo_flow.caching import SimpleCache
# Create cache instance
cache = SimpleCache(max_size=1000, ttl=3600)
# Store value
cache.set("user_123", {"name": "John", "age": 30})
# Retrieve value
user_data = cache.get("user_123")
# Check if key exists
if cache.has("user_123"):
print("User found in cache")
# Clear cache
cache.clear()
CacheNode
Wrap any node with caching:Copy
from nadoo_flow import CacheNode, BaseNode
class ExpensiveNode(BaseNode):
async def execute(self, node_context, workflow_context):
# Expensive computation
result = await self.complex_calculation(node_context.input_data)
return NodeResult(success=True, output=result)
# Wrap with caching
cached_node = CacheNode(
node=ExpensiveNode(),
cache_key_fn=lambda input_data: f"calc_{hash(str(input_data))}",
ttl=3600
)
Cache Implementations
In-Memory Cache
Copy
from typing import Any, Optional
import time
class InMemoryCache:
"""Simple in-memory cache with TTL"""
def __init__(self, max_size=1000, default_ttl=3600):
self.cache = {}
self.max_size = max_size
self.default_ttl = default_ttl
self.access_count = {}
self.hit_count = 0
self.miss_count = 0
def set(self, key: str, value: Any, ttl: Optional[int] = None):
"""Store value in cache"""
if len(self.cache) >= self.max_size:
self._evict()
self.cache[key] = {
"value": value,
"expires_at": time.time() + (ttl or self.default_ttl),
"created_at": time.time()
}
def get(self, key: str) -> Optional[Any]:
"""Retrieve value from cache"""
if key not in self.cache:
self.miss_count += 1
return None
entry = self.cache[key]
# Check expiration
if time.time() > entry["expires_at"]:
del self.cache[key]
self.miss_count += 1
return None
self.hit_count += 1
self.access_count[key] = self.access_count.get(key, 0) + 1
return entry["value"]
def _evict(self):
"""Evict least recently used items"""
# Simple LRU eviction
if not self.cache:
return
oldest_key = min(
self.cache.keys(),
key=lambda k: self.cache[k]["created_at"]
)
del self.cache[oldest_key]
def get_stats(self):
"""Get cache statistics"""
total_requests = self.hit_count + self.miss_count
hit_rate = self.hit_count / total_requests if total_requests > 0 else 0
return {
"size": len(self.cache),
"hit_count": self.hit_count,
"miss_count": self.miss_count,
"hit_rate": hit_rate,
"most_accessed": sorted(
self.access_count.items(),
key=lambda x: x[1],
reverse=True
)[:10]
}
Redis Cache
Copy
import json
import redis
from typing import Any, Optional
class RedisCache:
"""Redis-backed cache"""
def __init__(self, redis_client=None, prefix="nadoo:", default_ttl=3600):
self.client = redis_client or redis.Redis()
self.prefix = prefix
self.default_ttl = default_ttl
async def set(self, key: str, value: Any, ttl: Optional[int] = None):
"""Store value in Redis"""
full_key = f"{self.prefix}{key}"
serialized = json.dumps(value)
await self.client.setex(
full_key,
ttl or self.default_ttl,
serialized
)
async def get(self, key: str) -> Optional[Any]:
"""Retrieve value from Redis"""
full_key = f"{self.prefix}{key}"
value = await self.client.get(full_key)
if value:
return json.loads(value)
return None
async def delete(self, key: str):
"""Delete key from cache"""
full_key = f"{self.prefix}{key}"
await self.client.delete(full_key)
async def clear_pattern(self, pattern: str):
"""Clear all keys matching pattern"""
cursor = 0
while True:
cursor, keys = await self.client.scan(
cursor,
match=f"{self.prefix}{pattern}*"
)
if keys:
await self.client.delete(*keys)
if cursor == 0:
break
Advanced Caching Patterns
Multi-Level Cache
Copy
class MultiLevelCache:
"""Cache with multiple levels (L1, L2, etc.)"""
def __init__(self):
self.l1_cache = InMemoryCache(max_size=100, default_ttl=60)
self.l2_cache = InMemoryCache(max_size=1000, default_ttl=3600)
self.l3_cache = RedisCache()
async def get(self, key: str) -> Optional[Any]:
"""Try each cache level"""
# Check L1
value = self.l1_cache.get(key)
if value is not None:
return value
# Check L2
value = self.l2_cache.get(key)
if value is not None:
# Promote to L1
self.l1_cache.set(key, value)
return value
# Check L3
value = await self.l3_cache.get(key)
if value is not None:
# Promote to L1 and L2
self.l1_cache.set(key, value)
self.l2_cache.set(key, value)
return value
return None
async def set(self, key: str, value: Any):
"""Set in all cache levels"""
self.l1_cache.set(key, value)
self.l2_cache.set(key, value)
await self.l3_cache.set(key, value)
Smart Cache Invalidation
Copy
class SmartCache:
"""Cache with intelligent invalidation"""
def __init__(self):
self.cache = InMemoryCache()
self.dependencies = {} # Track key dependencies
def set(self, key: str, value: Any, depends_on: list = None):
"""Set with dependency tracking"""
self.cache.set(key, value)
if depends_on:
for dep in depends_on:
if dep not in self.dependencies:
self.dependencies[dep] = set()
self.dependencies[dep].add(key)
def invalidate(self, key: str, cascade=True):
"""Invalidate key and dependent keys"""
# Invalidate the key
self.cache.delete(key)
if cascade and key in self.dependencies:
# Invalidate all dependent keys
for dependent_key in self.dependencies[key]:
self.invalidate(dependent_key, cascade=True)
del self.dependencies[key]
def get(self, key: str) -> Optional[Any]:
"""Get with automatic validation"""
value = self.cache.get(key)
# Additional validation logic
if value and self._is_stale(key):
self.invalidate(key)
return None
return value
def _is_stale(self, key: str) -> bool:
"""Check if cache entry is stale"""
# Custom staleness logic
return False
Caching Strategies
Cache-Aside Pattern
Copy
class CacheAsideNode(BaseNode):
"""Cache-aside (lazy loading) pattern"""
def __init__(self, cache, data_source):
super().__init__(node_id="cache_aside")
self.cache = cache
self.data_source = data_source
async def execute(self, node_context, workflow_context):
key = self._generate_key(node_context.input_data)
# Try cache first
cached_value = await self.cache.get(key)
if cached_value:
return NodeResult(
success=True,
output=cached_value,
metadata={"cache_hit": True}
)
# Load from data source
value = await self.data_source.fetch(node_context.input_data)
# Store in cache
await self.cache.set(key, value)
return NodeResult(
success=True,
output=value,
metadata={"cache_hit": False}
)
Write-Through Cache
Copy
class WriteThroughCache(BaseNode):
"""Write-through caching pattern"""
def __init__(self, cache, data_store):
super().__init__(node_id="write_through")
self.cache = cache
self.data_store = data_store
async def write(self, key: str, value: Any):
"""Write to both cache and store"""
# Write to data store first
await self.data_store.write(key, value)
# Then update cache
await self.cache.set(key, value)
async def read(self, key: str) -> Optional[Any]:
"""Read from cache, fallback to store"""
# Try cache
value = await self.cache.get(key)
if value:
return value
# Fallback to store
value = await self.data_store.read(key)
if value:
# Update cache
await self.cache.set(key, value)
return value
Write-Behind Cache
Copy
import asyncio
from collections import deque
class WriteBehindCache:
"""Write-behind (write-back) caching"""
def __init__(self, cache, data_store, flush_interval=5):
self.cache = cache
self.data_store = data_store
self.flush_interval = flush_interval
self.write_queue = deque()
self._start_flush_task()
async def write(self, key: str, value: Any):
"""Write to cache immediately, queue for store"""
# Update cache immediately
await self.cache.set(key, value)
# Queue for eventual write
self.write_queue.append((key, value))
async def read(self, key: str) -> Optional[Any]:
"""Read from cache"""
return await self.cache.get(key)
def _start_flush_task(self):
"""Start background flush task"""
asyncio.create_task(self._flush_loop())
async def _flush_loop(self):
"""Periodically flush writes to store"""
while True:
await asyncio.sleep(self.flush_interval)
await self._flush()
async def _flush(self):
"""Flush pending writes"""
batch = []
while self.write_queue and len(batch) < 100:
batch.append(self.write_queue.popleft())
if batch:
# Batch write to store
await self.data_store.batch_write(batch)
Cache Warming
Proactive Cache Warming
Copy
class CacheWarmer:
"""Warm cache proactively"""
def __init__(self, cache, data_source):
self.cache = cache
self.data_source = data_source
async def warm_cache(self, keys: list):
"""Pre-populate cache with frequently used data"""
tasks = []
for key in keys:
task = self._warm_single(key)
tasks.append(task)
await asyncio.gather(*tasks)
async def _warm_single(self, key: str):
"""Warm single cache entry"""
try:
# Check if already cached
if await self.cache.get(key):
return
# Fetch and cache
value = await self.data_source.fetch(key)
await self.cache.set(key, value)
except Exception as e:
logger.error(f"Failed to warm cache for {key}: {e}")
async def warm_from_analytics(self):
"""Warm cache based on usage analytics"""
# Get most accessed keys
popular_keys = await self.get_popular_keys()
# Warm cache with popular data
await self.warm_cache(popular_keys)
Cache Monitoring
Cache Metrics
Copy
class CacheMetrics:
"""Monitor cache performance"""
def __init__(self):
self.metrics = {
"hits": 0,
"misses": 0,
"sets": 0,
"deletes": 0,
"evictions": 0,
"response_times": []
}
def record_hit(self):
"""Record cache hit"""
self.metrics["hits"] += 1
def record_miss(self):
"""Record cache miss"""
self.metrics["misses"] += 1
def record_response_time(self, duration: float):
"""Record response time"""
self.metrics["response_times"].append(duration)
# Keep only recent times
if len(self.metrics["response_times"]) > 1000:
self.metrics["response_times"] = self.metrics["response_times"][-1000:]
def get_statistics(self):
"""Get cache statistics"""
total_requests = self.metrics["hits"] + self.metrics["misses"]
hit_ratio = self.metrics["hits"] / total_requests if total_requests > 0 else 0
response_times = self.metrics["response_times"]
avg_response = sum(response_times) / len(response_times) if response_times else 0
return {
"hit_ratio": hit_ratio,
"total_hits": self.metrics["hits"],
"total_misses": self.metrics["misses"],
"avg_response_time": avg_response,
"cache_efficiency": self._calculate_efficiency()
}
def _calculate_efficiency(self):
"""Calculate cache efficiency score"""
hit_ratio = self.metrics["hits"] / (self.metrics["hits"] + self.metrics["misses"] + 0.001)
eviction_ratio = self.metrics["evictions"] / (self.metrics["sets"] + 0.001)
# High hit ratio and low eviction ratio = good efficiency
efficiency = hit_ratio * (1 - eviction_ratio)
return efficiency
Cached Workflow Node
Complete Cached Node Implementation
Copy
class CachedWorkflowNode(BaseNode):
"""Full-featured cached node"""
def __init__(
self,
wrapped_node,
cache_config=None,
key_generator=None
):
super().__init__(node_id=f"cached_{wrapped_node.node_id}")
self.wrapped_node = wrapped_node
self.cache = self._create_cache(cache_config or {})
self.key_generator = key_generator or self._default_key_generator
self.metrics = CacheMetrics()
def _create_cache(self, config):
"""Create cache based on config"""
cache_type = config.get("type", "memory")
if cache_type == "memory":
return InMemoryCache(**config.get("params", {}))
elif cache_type == "redis":
return RedisCache(**config.get("params", {}))
elif cache_type == "multi":
return MultiLevelCache()
else:
raise ValueError(f"Unknown cache type: {cache_type}")
def _default_key_generator(self, input_data):
"""Generate cache key from input"""
import hashlib
import json
# Sort keys for consistent hashing
sorted_data = json.dumps(input_data, sort_keys=True)
return hashlib.sha256(sorted_data.encode()).hexdigest()
async def execute(self, node_context, workflow_context):
# Generate cache key
cache_key = self.key_generator(node_context.input_data)
# Try cache
start_time = time.time()
cached_result = await self.cache.get(cache_key)
if cached_result:
self.metrics.record_hit()
self.metrics.record_response_time(time.time() - start_time)
return NodeResult(
success=True,
output=cached_result,
metadata={
"cache_hit": True,
"cache_key": cache_key,
"response_time": time.time() - start_time
}
)
# Cache miss - execute node
self.metrics.record_miss()
result = await self.wrapped_node.execute(node_context, workflow_context)
# Cache successful results
if result.success:
await self.cache.set(cache_key, result.output)
self.metrics.record_response_time(time.time() - start_time)
return NodeResult(
success=result.success,
output=result.output,
error=result.error,
metadata={
"cache_hit": False,
"cache_key": cache_key,
"response_time": time.time() - start_time
}
)
def get_cache_stats(self):
"""Get cache statistics"""
return self.metrics.get_statistics()
Best Practices
Choose Appropriate TTL
Choose Appropriate TTL
Set TTL based on data volatility:
- Static data: Long TTL (hours/days)
- Dynamic data: Short TTL (minutes)
- Real-time data: Very short TTL (seconds)
Implement Cache Warming
Implement Cache Warming
Pre-populate cache with frequently accessed data during low-traffic periods.
Monitor Cache Performance
Monitor Cache Performance
Track hit ratios, response times, and eviction rates to optimize cache configuration.
Handle Cache Stampede
Handle Cache Stampede
Use locks or probabilistic early expiration to prevent cache stampede.
Use Consistent Hashing
Use Consistent Hashing
For distributed caches, use consistent hashing for better key distribution.