Overview
Flow Core provides comprehensive rate limiting capabilities to control API usage, prevent abuse, and ensure fair resource allocation using token bucket algorithms.Token Bucket Algorithm
Basic Rate Limiter
Copy
from nadoo_flow import RateLimiter
# Create rate limiter - 10 requests per second
limiter = RateLimiter(
rate=10, # tokens per second
capacity=100, # bucket capacity
)
# Check if request is allowed
if await limiter.acquire():
# Process request
await process_request()
else:
# Rate limit exceeded
raise RateLimitExceeded("Too many requests")
RateLimitedNode
Wrap any node with rate limiting:Copy
from nadoo_flow import RateLimitedNode
node = RateLimitedNode(
wrapped_node=APICallNode(),
rate=5, # 5 calls per second
burst=10, # Allow burst of 10
)
result = await node.run(input_data)
Rate Limiting Strategies
Fixed Window
Copy
class FixedWindowRateLimiter:
"""Fixed time window rate limiting"""
def __init__(self, limit=100, window_seconds=60):
self.limit = limit
self.window_seconds = window_seconds
self.requests = {}
async def is_allowed(self, key: str) -> bool:
current_window = int(time.time() // self.window_seconds)
if key not in self.requests:
self.requests[key] = {}
if current_window not in self.requests[key]:
self.requests[key] = {current_window: 0}
if self.requests[key][current_window] >= self.limit:
return False
self.requests[key][current_window] += 1
return True
Sliding Window
Copy
class SlidingWindowRateLimiter:
"""Sliding window rate limiting"""
def __init__(self, limit=100, window_seconds=60):
self.limit = limit
self.window_seconds = window_seconds
self.requests = defaultdict(deque)
async def is_allowed(self, key: str) -> bool:
now = time.time()
# Remove old entries
while self.requests[key] and self.requests[key][0] < now - self.window_seconds:
self.requests[key].popleft()
if len(self.requests[key]) >= self.limit:
return False
self.requests[key].append(now)
return True
Leaky Bucket
Copy
class LeakyBucket:
"""Leaky bucket rate limiting"""
def __init__(self, rate=1.0, capacity=10):
self.rate = rate # Leak rate per second
self.capacity = capacity
self.water = 0
self.last_leak = time.time()
async def add(self, amount=1) -> bool:
"""Add water to bucket"""
await self._leak()
if self.water + amount > self.capacity:
return False
self.water += amount
return True
async def _leak(self):
"""Leak water from bucket"""
now = time.time()
elapsed = now - self.last_leak
leaked = elapsed * self.rate
self.water = max(0, self.water - leaked)
self.last_leak = now
Advanced Rate Limiting
Multi-Tier Rate Limiting
Copy
class TieredRateLimiter:
"""Different limits for different user tiers"""
def __init__(self):
self.tiers = {
"free": RateLimiter(rate=1, capacity=10),
"basic": RateLimiter(rate=10, capacity=100),
"premium": RateLimiter(rate=100, capacity=1000),
"enterprise": RateLimiter(rate=1000, capacity=10000)
}
async def acquire(self, user_tier: str, tokens=1):
limiter = self.tiers.get(user_tier, self.tiers["free"])
return await limiter.acquire(tokens)
Distributed Rate Limiting
Copy
class DistributedRateLimiter:
"""Rate limiting across multiple servers using Redis"""
def __init__(self, redis_client, prefix="ratelimit:"):
self.redis = redis_client
self.prefix = prefix
async def is_allowed(
self,
key: str,
limit: int,
window: int
) -> bool:
redis_key = f"{self.prefix}{key}"
# Lua script for atomic operation
lua_script = """
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('incr', key)
if current == 1 then
redis.call('expire', key, window)
end
return current <= limit
"""
allowed = await self.redis.eval(
lua_script,
keys=[redis_key],
args=[limit, window]
)
return bool(allowed)
Rate Limiting in Workflows
Workflow-Level Rate Limiting
Copy
class RateLimitedWorkflow:
"""Workflow with built-in rate limiting"""
def __init__(self, workflow, rate_config):
self.workflow = workflow
self.limiter = RateLimiter(**rate_config)
async def run(self, input_data):
# Check rate limit
if not await self.limiter.acquire():
raise RateLimitExceeded(
"Workflow rate limit exceeded. Please retry later."
)
# Execute workflow
return await self.workflow.run(input_data)
Per-Node Rate Limiting
Copy
class NodeRateLimiter:
"""Apply different rate limits to different nodes"""
def __init__(self):
self.limiters = {}
def register_node(self, node_id: str, rate: float, capacity: int):
"""Register rate limit for a node"""
self.limiters[node_id] = RateLimiter(rate, capacity)
async def acquire(self, node_id: str) -> bool:
"""Check if node can execute"""
if node_id not in self.limiters:
return True # No limit
return await self.limiters[node_id].acquire()
# Usage in workflow
node_limiter = NodeRateLimiter()
node_limiter.register_node("api_node", rate=5, capacity=50)
node_limiter.register_node("db_node", rate=100, capacity=1000)
Cost-Based Rate Limiting
API Cost Management
Copy
class CostBasedRateLimiter:
"""Rate limit based on API costs"""
def __init__(self, budget_per_minute=1.0):
self.budget_per_minute = budget_per_minute
self.costs = deque()
self.lock = asyncio.Lock()
async def acquire(self, operation: str) -> bool:
"""Check if operation is allowed within budget"""
cost = self._get_operation_cost(operation)
async with self.lock:
now = time.time()
# Remove old costs
cutoff = now - 60
while self.costs and self.costs[0][0] < cutoff:
self.costs.popleft()
# Calculate current spend
current_spend = sum(c[1] for c in self.costs)
if current_spend + cost > self.budget_per_minute:
return False
self.costs.append((now, cost))
return True
def _get_operation_cost(self, operation: str) -> float:
"""Get cost for operation"""
costs = {
"gpt-4": 0.03,
"gpt-3.5": 0.002,
"embedding": 0.0001,
"image": 0.02
}
return costs.get(operation, 0.001)
Adaptive Rate Limiting
Dynamic Rate Adjustment
Copy
class AdaptiveRateLimiter:
"""Adjust rate limits based on system load"""
def __init__(self, base_rate=10):
self.base_rate = base_rate
self.current_rate = base_rate
self.metrics = deque(maxlen=100)
async def acquire(self) -> bool:
"""Acquire with adaptive rate"""
# Adjust rate based on metrics
self._adjust_rate()
# Use current rate
allowed = await self._check_rate(self.current_rate)
# Record result
self.metrics.append({
"time": time.time(),
"allowed": allowed,
"rate": self.current_rate
})
return allowed
def _adjust_rate(self):
"""Adjust rate based on performance"""
if len(self.metrics) < 10:
return
# Calculate success rate
recent = list(self.metrics)[-10:]
success_rate = sum(m["allowed"] for m in recent) / len(recent)
if success_rate < 0.5:
# Too many rejections, increase rate
self.current_rate = min(
self.current_rate * 1.1,
self.base_rate * 2
)
elif success_rate > 0.9:
# Room to decrease rate
self.current_rate = max(
self.current_rate * 0.9,
self.base_rate * 0.5
)
Rate Limit Headers
HTTP Headers Support
Copy
class HTTPRateLimiter:
"""Rate limiter with HTTP headers"""
async def check_and_set_headers(
self,
request,
response,
key: str
):
"""Check rate limit and set response headers"""
limit = 100
window = 3600
remaining = await self.get_remaining(key)
reset_time = await self.get_reset_time(key)
# Set headers
response.headers["X-RateLimit-Limit"] = str(limit)
response.headers["X-RateLimit-Remaining"] = str(remaining)
response.headers["X-RateLimit-Reset"] = str(reset_time)
if remaining <= 0:
response.headers["Retry-After"] = str(reset_time - time.time())
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
Monitoring and Metrics
Rate Limit Metrics
Copy
class RateLimitMetrics:
"""Track rate limiting metrics"""
def __init__(self):
self.metrics = {
"requests_allowed": 0,
"requests_denied": 0,
"unique_clients": set(),
"denial_reasons": defaultdict(int)
}
def record_request(self, client_id: str, allowed: bool, reason=None):
"""Record rate limit decision"""
self.metrics["unique_clients"].add(client_id)
if allowed:
self.metrics["requests_allowed"] += 1
else:
self.metrics["requests_denied"] += 1
if reason:
self.metrics["denial_reasons"][reason] += 1
def get_report(self):
"""Generate metrics report"""
total = (
self.metrics["requests_allowed"] +
self.metrics["requests_denied"]
)
return {
"total_requests": total,
"allowed": self.metrics["requests_allowed"],
"denied": self.metrics["requests_denied"],
"denial_rate": self.metrics["requests_denied"] / total if total > 0 else 0,
"unique_clients": len(self.metrics["unique_clients"]),
"top_denial_reasons": dict(
sorted(
self.metrics["denial_reasons"].items(),
key=lambda x: x[1],
reverse=True
)[:5]
)
}
Best Practices
Choose Appropriate Algorithm
Choose Appropriate Algorithm
- Token Bucket: Good for allowing bursts
- Leaky Bucket: Smooth, consistent rate
- Fixed Window: Simple but can have thundering herd
- Sliding Window: More accurate but higher overhead
Set Reasonable Limits
Set Reasonable Limits
Don’t set limits too low or too high. Monitor and adjust based on actual usage.
Provide Clear Feedback
Provide Clear Feedback
Always inform clients about rate limits via headers or error messages.
Implement Backoff
Implement Backoff
Encourage clients to implement exponential backoff when rate limited.
Monitor Usage
Monitor Usage
Track rate limit metrics to identify patterns and adjust limits.