Skip to main content

Overview

Flow Core provides comprehensive rate limiting capabilities to control API usage, prevent abuse, and ensure fair resource allocation using token bucket algorithms.

Token Bucket Algorithm

Basic Rate Limiter

from nadoo_flow import RateLimiter

# Create rate limiter - 10 requests per second
limiter = RateLimiter(
    rate=10,  # tokens per second
    capacity=100,  # bucket capacity
)

# Check if request is allowed
if await limiter.acquire():
    # Process request
    await process_request()
else:
    # Rate limit exceeded
    raise RateLimitExceeded("Too many requests")

RateLimitedNode

Wrap any node with rate limiting:
from nadoo_flow import RateLimitedNode

node = RateLimitedNode(
    wrapped_node=APICallNode(),
    rate=5,  # 5 calls per second
    burst=10,  # Allow burst of 10
)

result = await node.run(input_data)

Rate Limiting Strategies

Fixed Window

class FixedWindowRateLimiter:
    """Fixed time window rate limiting"""

    def __init__(self, limit=100, window_seconds=60):
        self.limit = limit
        self.window_seconds = window_seconds
        self.requests = {}

    async def is_allowed(self, key: str) -> bool:
        current_window = int(time.time() // self.window_seconds)

        if key not in self.requests:
            self.requests[key] = {}

        if current_window not in self.requests[key]:
            self.requests[key] = {current_window: 0}

        if self.requests[key][current_window] >= self.limit:
            return False

        self.requests[key][current_window] += 1
        return True

Sliding Window

class SlidingWindowRateLimiter:
    """Sliding window rate limiting"""

    def __init__(self, limit=100, window_seconds=60):
        self.limit = limit
        self.window_seconds = window_seconds
        self.requests = defaultdict(deque)

    async def is_allowed(self, key: str) -> bool:
        now = time.time()

        # Remove old entries
        while self.requests[key] and self.requests[key][0] < now - self.window_seconds:
            self.requests[key].popleft()

        if len(self.requests[key]) >= self.limit:
            return False

        self.requests[key].append(now)
        return True

Leaky Bucket

class LeakyBucket:
    """Leaky bucket rate limiting"""

    def __init__(self, rate=1.0, capacity=10):
        self.rate = rate  # Leak rate per second
        self.capacity = capacity
        self.water = 0
        self.last_leak = time.time()

    async def add(self, amount=1) -> bool:
        """Add water to bucket"""
        await self._leak()

        if self.water + amount > self.capacity:
            return False

        self.water += amount
        return True

    async def _leak(self):
        """Leak water from bucket"""
        now = time.time()
        elapsed = now - self.last_leak
        leaked = elapsed * self.rate

        self.water = max(0, self.water - leaked)
        self.last_leak = now

Advanced Rate Limiting

Multi-Tier Rate Limiting

class TieredRateLimiter:
    """Different limits for different user tiers"""

    def __init__(self):
        self.tiers = {
            "free": RateLimiter(rate=1, capacity=10),
            "basic": RateLimiter(rate=10, capacity=100),
            "premium": RateLimiter(rate=100, capacity=1000),
            "enterprise": RateLimiter(rate=1000, capacity=10000)
        }

    async def acquire(self, user_tier: str, tokens=1):
        limiter = self.tiers.get(user_tier, self.tiers["free"])
        return await limiter.acquire(tokens)

Distributed Rate Limiting

class DistributedRateLimiter:
    """Rate limiting across multiple servers using Redis"""

    def __init__(self, redis_client, prefix="ratelimit:"):
        self.redis = redis_client
        self.prefix = prefix

    async def is_allowed(
        self,
        key: str,
        limit: int,
        window: int
    ) -> bool:
        redis_key = f"{self.prefix}{key}"

        # Lua script for atomic operation
        lua_script = """
        local key = KEYS[1]
        local limit = tonumber(ARGV[1])
        local window = tonumber(ARGV[2])
        local current = redis.call('incr', key)

        if current == 1 then
            redis.call('expire', key, window)
        end

        return current <= limit
        """

        allowed = await self.redis.eval(
            lua_script,
            keys=[redis_key],
            args=[limit, window]
        )

        return bool(allowed)

Rate Limiting in Workflows

Workflow-Level Rate Limiting

class RateLimitedWorkflow:
    """Workflow with built-in rate limiting"""

    def __init__(self, workflow, rate_config):
        self.workflow = workflow
        self.limiter = RateLimiter(**rate_config)

    async def run(self, input_data):
        # Check rate limit
        if not await self.limiter.acquire():
            raise RateLimitExceeded(
                "Workflow rate limit exceeded. Please retry later."
            )

        # Execute workflow
        return await self.workflow.run(input_data)

Per-Node Rate Limiting

class NodeRateLimiter:
    """Apply different rate limits to different nodes"""

    def __init__(self):
        self.limiters = {}

    def register_node(self, node_id: str, rate: float, capacity: int):
        """Register rate limit for a node"""
        self.limiters[node_id] = RateLimiter(rate, capacity)

    async def acquire(self, node_id: str) -> bool:
        """Check if node can execute"""
        if node_id not in self.limiters:
            return True  # No limit

        return await self.limiters[node_id].acquire()

# Usage in workflow
node_limiter = NodeRateLimiter()
node_limiter.register_node("api_node", rate=5, capacity=50)
node_limiter.register_node("db_node", rate=100, capacity=1000)

Cost-Based Rate Limiting

API Cost Management

class CostBasedRateLimiter:
    """Rate limit based on API costs"""

    def __init__(self, budget_per_minute=1.0):
        self.budget_per_minute = budget_per_minute
        self.costs = deque()
        self.lock = asyncio.Lock()

    async def acquire(self, operation: str) -> bool:
        """Check if operation is allowed within budget"""
        cost = self._get_operation_cost(operation)

        async with self.lock:
            now = time.time()

            # Remove old costs
            cutoff = now - 60
            while self.costs and self.costs[0][0] < cutoff:
                self.costs.popleft()

            # Calculate current spend
            current_spend = sum(c[1] for c in self.costs)

            if current_spend + cost > self.budget_per_minute:
                return False

            self.costs.append((now, cost))
            return True

    def _get_operation_cost(self, operation: str) -> float:
        """Get cost for operation"""
        costs = {
            "gpt-4": 0.03,
            "gpt-3.5": 0.002,
            "embedding": 0.0001,
            "image": 0.02
        }
        return costs.get(operation, 0.001)

Adaptive Rate Limiting

Dynamic Rate Adjustment

class AdaptiveRateLimiter:
    """Adjust rate limits based on system load"""

    def __init__(self, base_rate=10):
        self.base_rate = base_rate
        self.current_rate = base_rate
        self.metrics = deque(maxlen=100)

    async def acquire(self) -> bool:
        """Acquire with adaptive rate"""
        # Adjust rate based on metrics
        self._adjust_rate()

        # Use current rate
        allowed = await self._check_rate(self.current_rate)

        # Record result
        self.metrics.append({
            "time": time.time(),
            "allowed": allowed,
            "rate": self.current_rate
        })

        return allowed

    def _adjust_rate(self):
        """Adjust rate based on performance"""
        if len(self.metrics) < 10:
            return

        # Calculate success rate
        recent = list(self.metrics)[-10:]
        success_rate = sum(m["allowed"] for m in recent) / len(recent)

        if success_rate < 0.5:
            # Too many rejections, increase rate
            self.current_rate = min(
                self.current_rate * 1.1,
                self.base_rate * 2
            )
        elif success_rate > 0.9:
            # Room to decrease rate
            self.current_rate = max(
                self.current_rate * 0.9,
                self.base_rate * 0.5
            )

Rate Limit Headers

HTTP Headers Support

class HTTPRateLimiter:
    """Rate limiter with HTTP headers"""

    async def check_and_set_headers(
        self,
        request,
        response,
        key: str
    ):
        """Check rate limit and set response headers"""
        limit = 100
        window = 3600

        remaining = await self.get_remaining(key)
        reset_time = await self.get_reset_time(key)

        # Set headers
        response.headers["X-RateLimit-Limit"] = str(limit)
        response.headers["X-RateLimit-Remaining"] = str(remaining)
        response.headers["X-RateLimit-Reset"] = str(reset_time)

        if remaining <= 0:
            response.headers["Retry-After"] = str(reset_time - time.time())
            raise HTTPException(
                status_code=429,
                detail="Rate limit exceeded"
            )

Monitoring and Metrics

Rate Limit Metrics

class RateLimitMetrics:
    """Track rate limiting metrics"""

    def __init__(self):
        self.metrics = {
            "requests_allowed": 0,
            "requests_denied": 0,
            "unique_clients": set(),
            "denial_reasons": defaultdict(int)
        }

    def record_request(self, client_id: str, allowed: bool, reason=None):
        """Record rate limit decision"""
        self.metrics["unique_clients"].add(client_id)

        if allowed:
            self.metrics["requests_allowed"] += 1
        else:
            self.metrics["requests_denied"] += 1
            if reason:
                self.metrics["denial_reasons"][reason] += 1

    def get_report(self):
        """Generate metrics report"""
        total = (
            self.metrics["requests_allowed"] +
            self.metrics["requests_denied"]
        )

        return {
            "total_requests": total,
            "allowed": self.metrics["requests_allowed"],
            "denied": self.metrics["requests_denied"],
            "denial_rate": self.metrics["requests_denied"] / total if total > 0 else 0,
            "unique_clients": len(self.metrics["unique_clients"]),
            "top_denial_reasons": dict(
                sorted(
                    self.metrics["denial_reasons"].items(),
                    key=lambda x: x[1],
                    reverse=True
                )[:5]
            )
        }

Best Practices

  • Token Bucket: Good for allowing bursts
  • Leaky Bucket: Smooth, consistent rate
  • Fixed Window: Simple but can have thundering herd
  • Sliding Window: More accurate but higher overhead
Don’t set limits too low or too high. Monitor and adjust based on actual usage.
Always inform clients about rate limits via headers or error messages.
Encourage clients to implement exponential backoff when rate limited.
Track rate limit metrics to identify patterns and adjust limits.

Next Steps