Skip to main content

Overview

The rate_limiting module implements the Token Bucket algorithm for controlling API request rates. It supports single-tenant, multi-tenant, and distributed rate limiting with Redis.

Classes

TokenBucket

Token Bucket algorithm implementation for rate limiting.
from nadoo_flow.rate_limiting import TokenBucket

# Allow 1 request every 2 seconds, with burst up to 10
bucket = TokenBucket(
    requests_per_second=0.5,  # 0.5 = once per 2 seconds
    max_bucket_size=10         # Allow burst of 10 requests
)

# Acquire token (async)
if await bucket.acquire():
    await call_api()

# Check availability without waiting
if bucket.try_acquire():
    call_api_sync()

Constructor

TokenBucket(
    requests_per_second: float,
    max_bucket_size: int = 10,
    check_every_n_seconds: float = 0.1
)
Parameters:
NameTypeDefaultDescription
requests_per_secondfloat-Requests per second (0.5 = once per 2 seconds)
max_bucket_sizeint10Maximum tokens in bucket (burst capacity)
check_every_n_secondsfloat0.1Check interval when waiting for tokens

Methods

acquire
Acquire tokens, waiting if necessary (async).
async def acquire(tokens: int = 1, timeout: float | None = None) -> bool
Parameters:
  • tokens - Number of tokens to acquire (default: 1)
  • timeout - Maximum wait time in seconds (None = wait indefinitely)
Returns:
  • bool - True if tokens acquired, False if timeout
Example:
# Wait for token
success = await bucket.acquire()
if success:
    await call_api()

# Wait with timeout
success = await bucket.acquire(timeout=5.0)
if not success:
    print("Rate limit timeout after 5 seconds")

# Acquire multiple tokens
success = await bucket.acquire(tokens=5)
try_acquire
Try to acquire tokens without waiting.
def try_acquire(tokens: int = 1) -> bool
Returns:
  • bool - True if tokens acquired, False otherwise
Example:
if bucket.try_acquire():
    # Tokens available, proceed
    call_api()
else:
    # No tokens, skip or queue
    print("Rate limited, skipping")
get_available_tokens
Get current number of available tokens.
def get_available_tokens() -> float
Returns:
  • float - Number of available tokens
get_wait_time
Calculate wait time until tokens available.
def get_wait_time(tokens: int = 1) -> float
Returns:
  • float - Wait time in seconds (0.0 if tokens available now)
Example:
wait = bucket.get_wait_time()
if wait > 0:
    print(f"Wait {wait:.2f}s for next request")

RateLimiter

Convenient rate limiter wrapper with async context manager support.
from nadoo_flow import RateLimiter

# Create limiter: 10 requests per second
limiter = RateLimiter(
    requests_per_second=10.0,
    max_bucket_size=20
)

# Use as context manager
async with limiter:
    await call_api()

# Or acquire manually
if await limiter.acquire():
    await call_api()

Constructor

RateLimiter(
    requests_per_second: float,
    max_bucket_size: int = 10,
    check_every_n_seconds: float = 0.1
)
Parameters: Same as TokenBucket

Methods

acquire
async def acquire(tokens: int = 1, timeout: float | None = None) -> bool
Same as TokenBucket.acquire()

Context Manager

async with limiter:
    # Automatically acquires 1 token before entering
    await call_api()
# No cleanup needed on exit

MultiTenantRateLimiter

Rate limiter with per-tenant (user/workspace) limits.
from nadoo_flow.rate_limiting import MultiTenantRateLimiter

limiter = MultiTenantRateLimiter(
    default_requests_per_second=1.0,
    default_max_bucket_size=10,
    tenant_limits={
        "premium_user_123": 10.0,  # Premium: 10 req/s
        "free_user_456": 0.1         # Free: 1 req/10s
    }
)

# Use with context manager
async with limiter.limit("user_123"):
    await call_api()

# Or acquire manually
if await limiter.acquire("workspace_abc"):
    await call_api()

# Update tenant limit
limiter.set_tenant_limit("user_789", 5.0)

Constructor

MultiTenantRateLimiter(
    default_requests_per_second: float = 1.0,
    default_max_bucket_size: int = 10,
    tenant_limits: dict[str, float] | None = None
)
Parameters:
  • default_requests_per_second - Default rate for unknown tenants
  • default_max_bucket_size - Default bucket size
  • tenant_limits - Per-tenant rate limits (key: tenant_id, value: requests_per_second)

Methods

acquire
Acquire token for specific tenant.
async def acquire(
    tenant_id: str,
    tokens: int = 1,
    timeout: float | None = None
) -> bool
Parameters:
  • tenant_id - Tenant identifier (user ID, workspace ID, etc.)
  • tokens - Number of tokens to acquire
  • timeout - Timeout in seconds
Returns:
  • bool - True if successful
limit
Get context manager for specific tenant.
def limit(tenant_id: str) -> _TenantLimitContext
Usage:
async with limiter.limit("user_123"):
    await call_api()
set_tenant_limit
Update rate limit for specific tenant.
def set_tenant_limit(tenant_id: str, requests_per_second: float)
Example:
# Upgrade user to premium
limiter.set_tenant_limit("user_123", 10.0)

# Downgrade to free tier
limiter.set_tenant_limit("user_456", 0.1)

RateLimitedNode

Mixin class to add rate limiting to nodes.
from nadoo_flow import BaseNode, NodeResult
from nadoo_flow.rate_limiting import RateLimitedNode, RateLimiter

class MyAPINode(BaseNode, RateLimitedNode):
    def __init__(self):
        BaseNode.__init__(
            self,
            node_id="api",
            node_type="api",
            name="API Node",
            config={}
        )
        RateLimitedNode.__init__(
            self,
            rate_limiter=RateLimiter(requests_per_second=5.0)
        )

    async def execute(self, node_context, workflow_context):
        # Apply rate limiting
        async with self.rate_limiter:
            response = await call_external_api()

        return NodeResult(success=True, output=response)

Constructor

def __init__(
    rate_limiter: RateLimiter | None = None,
    requests_per_second: float = 1.0
)
Parameters:
  • rate_limiter - RateLimiter instance (if None, creates default)
  • requests_per_second - Requests per second (used if rate_limiter is None)

Methods

enable_rate_limiting
Enable rate limiting for this node.
def enable_rate_limiting()
disable_rate_limiting
Disable rate limiting for this node.
def disable_rate_limiting()
is_rate_limiting_enabled
Check if rate limiting is enabled.
def is_rate_limiting_enabled() -> bool

RedisRateLimiter

Distributed rate limiter using Redis (requires redis package).
from nadoo_flow.rate_limiting import RedisRateLimiter
import redis

# Create Redis-based limiter
limiter = RedisRateLimiter(
    redis_client=redis.Redis(host="localhost", port=6379),
    key_prefix="rate_limit:",
    requests_per_window=100,  # 100 requests
    window_seconds=60          # per 60 seconds
)

# Check and increment
if await limiter.check_and_increment("user_123"):
    await call_api()
else:
    print("Rate limit exceeded")

# Check remaining quota
remaining = limiter.get_remaining("user_123")
print(f"Remaining: {remaining} requests")

# Reset counter
limiter.reset("user_123")

Constructor

RedisRateLimiter(
    redis_client: Redis,
    key_prefix: str = "rate_limit:",
    requests_per_window: int = 100,
    window_seconds: int = 60
)
Parameters:
  • redis_client - Redis client instance
  • key_prefix - Redis key prefix for namespacing
  • requests_per_window - Maximum requests per time window
  • window_seconds - Time window in seconds

Methods

check_and_increment
Atomically check limit and increment counter.
async def check_and_increment(identifier: str) -> bool
Uses Lua script for atomic operation. Returns:
  • bool - True if allowed, False if limit exceeded
get_remaining
Get remaining quota for identifier.
def get_remaining(identifier: str) -> int
Returns:
  • int - Number of remaining requests
reset
Reset counter for identifier.
def reset(identifier: str)

Usage Patterns

Basic Rate Limiting

from nadoo_flow import RateLimiter

# 2 requests per second
limiter = RateLimiter(requests_per_second=2.0)

async def call_api():
    async with limiter:
        return await external_api_call()

# Calls are automatically rate-limited
for i in range(10):
    result = await call_api()

Burst Handling

# Allow bursts up to 50 requests
limiter = RateLimiter(
    requests_per_second=10.0,
    max_bucket_size=50  # Can burst to 50
)

# Make 50 requests immediately (burst)
for i in range(50):
    async with limiter:
        await call_api()  # All 50 execute immediately

# 51st request waits for token refill
async with limiter:
    await call_api()  # Waits ~0.1s

Timeout Handling

limiter = RateLimiter(requests_per_second=0.1)  # Very slow

# Wait up to 5 seconds
if await limiter.acquire(timeout=5.0):
    await call_api()
else:
    print("Rate limit timeout - skipping request")

Multi-Tenant Usage

limiter = MultiTenantRateLimiter(
    default_requests_per_second=1.0,
    tenant_limits={
        "enterprise": 100.0,
        "pro": 10.0,
        "free": 0.5
    }
)

async def process_request(user_tier: str, user_id: str):
    tenant_key = f"{user_tier}_{user_id}"

    async with limiter.limit(tenant_key):
        return await call_api()

# Different users get different rates
await process_request("enterprise", "user1")  # 100 req/s
await process_request("free", "user2")        # 0.5 req/s

Dynamic Rate Adjustment

limiter = MultiTenantRateLimiter(default_requests_per_second=1.0)

async def upgrade_user(user_id: str):
    # Increase rate limit
    limiter.set_tenant_limit(user_id, 10.0)
    print(f"User {user_id} upgraded to 10 req/s")

async def downgrade_user(user_id: str):
    # Decrease rate limit
    limiter.set_tenant_limit(user_id, 0.1)
    print(f"User {user_id} downgraded to 1 req/10s")

Distributed Rate Limiting

import redis
from nadoo_flow.rate_limiting import RedisRateLimiter

# Share rate limits across multiple servers
redis_client = redis.Redis(host="redis.example.com")

limiter = RedisRateLimiter(
    redis_client=redis_client,
    requests_per_window=1000,
    window_seconds=3600  # 1000 requests per hour
)

# Works across all servers sharing this Redis
async def api_endpoint(user_id: str):
    if await limiter.check_and_increment(user_id):
        return await process_request()
    else:
        remaining = limiter.get_remaining(user_id)
        raise RateLimitError(f"Rate limit exceeded. Try again later. Remaining: {remaining}")

Conditional Rate Limiting

class ConditionalNode(BaseNode, RateLimitedNode):
    def __init__(self):
        BaseNode.__init__(self, node_id="api", node_type="api", name="API", config={})
        RateLimitedNode.__init__(self, requests_per_second=5.0)

    async def execute(self, node_context, workflow_context):
        # Disable rate limiting for premium users
        is_premium = node_context.get_input("is_premium", False)

        if is_premium:
            self.disable_rate_limiting()
        else:
            self.enable_rate_limiting()

        # Rate limiting applied only for non-premium
        if self.is_rate_limiting_enabled():
            async with self.rate_limiter:
                result = await call_api()
        else:
            result = await call_api()

        return NodeResult(success=True, output=result)

Rate Limit Monitoring

async def monitor_rate_limits():
    limiter = RateLimiter(requests_per_second=10.0, max_bucket_size=20)

    while True:
        available = limiter.bucket.get_available_tokens()
        wait_time = limiter.bucket.get_wait_time()

        print(f"Available tokens: {available:.2f}")
        print(f"Wait time: {wait_time:.2f}s")

        await asyncio.sleep(1)

Best Practices

Match rate limits to API provider limits:
# OpenAI: 3,500 RPM for tier 1
openai_limiter = RateLimiter(
    requests_per_second=58.3,  # 3500/60
    max_bucket_size=100        # Allow small bursts
)

# Anthropic: 50 RPM for free tier
anthropic_limiter = RateLimiter(
    requests_per_second=0.83,  # 50/60
    max_bucket_size=5
)
Allow reasonable bursts with max_bucket_size:
# Too restrictive: no burst
RateLimiter(requests_per_second=1.0, max_bucket_size=1)

# Good: allows burst of 10
RateLimiter(requests_per_second=1.0, max_bucket_size=10)

# Too permissive: defeats rate limiting
RateLimiter(requests_per_second=1.0, max_bucket_size=1000)
Always check return value or use try/except:
# Check return value
if await limiter.acquire(timeout=5.0):
    await call_api()
else:
    logger.error("Rate limit timeout")
    raise RateLimitError()

# Use context manager (simpler)
try:
    async with limiter:
        await call_api()
except asyncio.TimeoutError:
    logger.error("Rate limit exceeded")
For multi-server deployments, use RedisRateLimiter:
# Single server: InMemoryCache is fine
if single_server:
    limiter = RateLimiter(...)

# Multiple servers: Use Redis
else:
    limiter = RedisRateLimiter(
        redis_client=redis.Redis(...),
        ...
    )
Track quota to warn users before hitting limits:
remaining = limiter.bucket.get_available_tokens()
if remaining < 2:
    logger.warning("Low quota: {remaining} requests remaining")

# Or with Redis
remaining = redis_limiter.get_remaining(user_id)
if remaining < 10:
    send_warning_email(user_id)
Don’t share rate limiters across different APIs:
# Good: Separate limiters
openai_limiter = RateLimiter(requests_per_second=58.3)
anthropic_limiter = RateLimiter(requests_per_second=0.83)

# Bad: Shared limiter
shared_limiter = RateLimiter(requests_per_second=10.0)
# Both APIs limited together (not what you want)
Use MultiTenantRateLimiter for user tiers:
limiter = MultiTenantRateLimiter(
    default_requests_per_second=0.5,  # Free tier
    tenant_limits={
        "tier_free": 0.5,
        "tier_pro": 5.0,
        "tier_enterprise": 50.0
    }
)

async def get_user_tier(user_id):
    subscription = await db.get_subscription(user_id)
    return f"tier_{subscription.level}"

tier = await get_user_tier(user_id)
async with limiter.limit(tier):
    await call_api()

Rate Calculation Examples

Requests Per Second

RateCalculationDescription
1.01 req/s1 request per second
0.51/21 request every 2 seconds
0.11/101 request every 10 seconds
10.010 req/s10 requests per second
0.01671/601 request per minute

Common API Limits

# OpenAI GPT-4 (Tier 1): 3,500 RPM
openai_limiter = RateLimiter(requests_per_second=58.3)

# Anthropic Claude (Free): 50 RPM
anthropic_limiter = RateLimiter(requests_per_second=0.83)

# Google Gemini (Free): 60 RPM
gemini_limiter = RateLimiter(requests_per_second=1.0)

# Custom API: 10 requests per 5 seconds
custom_limiter = RateLimiter(requests_per_second=2.0)

See Also