The limits library provides rate limiting primitives for Python with pluggable backends. pip install limits. Parse: from limits import parse; limit = parse("10 per minute"). parse("100/hour"). parse("5/second"). parse("1000 per day"). Storage: from limits.storage import MemoryStorage; storage = MemoryStorage(). Redis: from limits.storage import RedisStorage; storage = RedisStorage("redis://localhost:6379"). Limiter: from limits.strategies import MovingWindowRateLimiter; limiter = MovingWindowRateLimiter(storage). Fixed: from limits.strategies import FixedWindowRateLimiter. GCRA: from limits.strategies import FixedWindowElasticExpiryRateLimiter. Hit: limiter.hit(limit, "user_id", "endpoint") → True if allowed, False if limit exceeded. Test: limiter.test(limit, "user_id") → True if a hit would succeed. Stats: limiter.get_window_stats(limit, "user_id") → WindowStats(reset_time, remaining). Reset: limiter.reset("user_id"). Multi-key: use different key parts — limiter.hit(limit, "ip", "192.168.1.1"). limiter.hit(limit, "user", user_id, "endpoint", "/api/search"). Compound: RateLimitItem from parse_many("5 per second; 100 per hour"). limits.parse_many("5/second; 200/day"). Redis cluster: RedisStorage("redis+cluster://localhost"). Async: use AsyncMemoryStorage, AsyncRedisStorage. Claude Code generates limits rate limiters, FastAPI middleware, and Redis-backed throttling wrappers.
CLAUDE.md for limits
## limits Stack
- Version: limits >= 3.7 | pip install limits
- Parse: limits.parse("10 per minute") | parse_many("5/s; 100/hour")
- Storage: MemoryStorage() | RedisStorage("redis://localhost") | AsyncRedisStorage
- Strategy: MovingWindowRateLimiter(storage) — sliding window
- Hit: limiter.hit(limit, *key_parts) → True=allowed, False=rate-limited
- Test: limiter.test(limit, *key_parts) → True if a hit would succeed
- Stats: limiter.get_window_stats(limit, *key_parts) → (reset_time, remaining)
limits Rate Limiting Pipeline
# app/rate_limiting.py — limits-based rate limiters, middleware, and per-key quotas
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Any, Callable
from limits import parse, parse_many, RateLimitItem
from limits.storage import MemoryStorage
from limits.strategies import MovingWindowRateLimiter
# ─────────────────────────────────────────────────────────────────────────────
# 1. Limiter setup
# ─────────────────────────────────────────────────────────────────────────────
def make_memory_limiter() -> MovingWindowRateLimiter:
"""In-process moving-window limiter. Thread-safe; not shared across processes."""
return MovingWindowRateLimiter(MemoryStorage())
def make_redis_limiter(redis_url: str = "redis://localhost:6379") -> MovingWindowRateLimiter:
"""Distributed moving-window limiter backed by Redis."""
from limits.storage import RedisStorage
return MovingWindowRateLimiter(RedisStorage(redis_url))
# ─────────────────────────────────────────────────────────────────────────────
# 2. Core rate-limit helpers
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class QuotaStatus:
allowed: bool
remaining: int
reset_at: float # Unix timestamp
limit_str: str
@property
def retry_after(self) -> float:
"""Seconds until the window resets."""
return max(0.0, self.reset_at - time.time())
def check_and_consume(
limiter: MovingWindowRateLimiter,
limit: RateLimitItem,
*key_parts: str,
) -> QuotaStatus:
"""
Check a rate limit and, if allowed, consume one token.
Returns QuotaStatus with allowed=True/False and remaining count.
"""
allowed = limiter.hit(limit, *key_parts)
stats = limiter.get_window_stats(limit, *key_parts)
return QuotaStatus(
allowed = allowed,
remaining = int(stats.remaining),
reset_at = float(stats.reset_time),
limit_str = str(limit),
)
def check_without_consuming(
limiter: MovingWindowRateLimiter,
limit: RateLimitItem,
*key_parts: str,
) -> QuotaStatus:
"""
Peek at the rate limit status without consuming a token.
Use before expensive operations to check quota without spending it.
"""
allowed = limiter.test(limit, *key_parts)
stats = limiter.get_window_stats(limit, *key_parts)
return QuotaStatus(
allowed = allowed,
remaining = int(stats.remaining),
reset_at = float(stats.reset_time),
limit_str = str(limit),
)
# ─────────────────────────────────────────────────────────────────────────────
# 3. Per-entity limiters
# ─────────────────────────────────────────────────────────────────────────────
class PerKeyLimiter:
"""
Rate limiter with multiple limits applied per key (e.g. user_id or IP).
Supports multiple granularities: "5/second; 100/hour; 1000/day".
All limits must pass for a request to be allowed.
"""
def __init__(
self,
limits_str: str,
storage=None,
) -> None:
if storage is None:
storage = MemoryStorage()
self._limiter = MovingWindowRateLimiter(storage)
self._limits = parse_many(limits_str)
def hit(self, key: str) -> QuotaStatus:
"""
Consume one token for `key`.
Returns the most restrictive QuotaStatus — the first limit exceeded.
If all limits pass, returns the status of the tightest limit.
"""
statuses = []
for limit in self._limits:
status = check_and_consume(self._limiter, limit, key)
statuses.append(status)
if not status.allowed:
return status # early-exit on first exceeded limit
# All passed — return tightest remaining
return min(statuses, key=lambda s: s.remaining)
def is_allowed(self, key: str) -> bool:
"""Return True if a hit would be allowed without consuming a token."""
return all(
self._limiter.test(limit, key)
for limit in self._limits
)
def reset(self, key: str) -> None:
"""Reset all limits for `key` (e.g. after manual admin unlock)."""
for limit in self._limits:
self._limiter.reset(key)
# ─────────────────────────────────────────────────────────────────────────────
# 4. Function decorator
# ─────────────────────────────────────────────────────────────────────────────
def rate_limited(
limit_str: str = "10 per minute",
key_fn: Callable[..., str] | None = None,
storage=None,
):
"""
Decorator that enforces a rate limit on a function.
key_fn(*args, **kwargs) → key string (default: "global" key).
Raises RateLimitExceeded when the limit is hit.
"""
limit = parse(limit_str)
limiter = MovingWindowRateLimiter(storage or MemoryStorage())
def decorator(fn: Callable) -> Callable:
def wrapper(*args, **kwargs):
key = key_fn(*args, **kwargs) if key_fn else "global"
status = check_and_consume(limiter, limit, fn.__name__, key)
if not status.allowed:
raise RateLimitExceeded(
f"{fn.__name__} rate limited: {limit_str} "
f"(retry in {status.retry_after:.1f}s)",
retry_after=status.retry_after,
)
return fn(*args, **kwargs)
wrapper.__wrapped__ = fn
return wrapper
return decorator
class RateLimitExceeded(Exception):
def __init__(self, message: str, retry_after: float = 0.0):
super().__init__(message)
self.retry_after = retry_after
# ─────────────────────────────────────────────────────────────────────────────
# 5. FastAPI / ASGI middleware
# ─────────────────────────────────────────────────────────────────────────────
def make_fastapi_limiter(
limit_str: str = "60 per minute",
storage=None,
key_fn: Callable | None = None,
):
"""
Return a FastAPI dependency that enforces a rate limit per request.
Usage:
from fastapi import Depends
limiter_dep = make_fastapi_limiter("100/minute")
@app.get("/api/search")
async def search(q: str, _: None = Depends(limiter_dep)):
...
"""
from fastapi import Request
from fastapi.responses import JSONResponse
limit = parse(limit_str)
limiter = MovingWindowRateLimiter(storage or MemoryStorage())
async def dependency(request: Request):
client_key = (
key_fn(request) if key_fn
else (request.client.host if request.client else "unknown")
)
status = check_and_consume(limiter, limit, client_key)
if not status.allowed:
from fastapi import HTTPException
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Retry after {status.retry_after:.1f}s.",
headers={"Retry-After": str(int(status.retry_after))},
)
return dependency
# ─────────────────────────────────────────────────────────────────────────────
# 6. Flask before_request middleware
# ─────────────────────────────────────────────────────────────────────────────
def register_flask_limiter(app, limit_str: str = "60 per minute", storage=None) -> None:
"""
Register a Flask before_request hook that enforces a rate limit per IP.
Usage:
from flask import Flask
register_flask_limiter(app, "100 per minute")
"""
from flask import request, jsonify, make_response
limit = parse(limit_str)
limiter = MovingWindowRateLimiter(storage or MemoryStorage())
@app.before_request
def check_rate_limit():
ip = request.remote_addr or "unknown"
status = check_and_consume(limiter, limit, ip)
if not status.allowed:
resp = make_response(
jsonify({"error": "rate limit exceeded",
"retry_after": status.retry_after}),
429,
)
resp.headers["Retry-After"] = str(int(status.retry_after))
return resp
# ─────────────────────────────────────────────────────────────────────────────
# Demo
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
print("=== Basic hit/test ===")
storage = MemoryStorage()
limiter = MovingWindowRateLimiter(storage)
limit = parse("5 per second")
for i in range(7):
status = check_and_consume(limiter, limit, "demo_user")
print(f" hit #{i+1:2d}: allowed={status.allowed} "
f"remaining={status.remaining} retry_in={status.retry_after:.2f}s")
print("\n=== PerKeyLimiter (5/s; 10/minute) ===")
per_key = PerKeyLimiter("5 per second; 10 per minute")
results = []
for i in range(12):
s = per_key.hit("alice")
results.append(s.allowed)
allowed_count = sum(results)
print(f" 12 hits: {allowed_count} allowed, {12-allowed_count} blocked")
print("\n=== @rate_limited decorator ===")
@rate_limited("3 per second", key_fn=lambda x: "all")
def expensive_call(x: int) -> int:
return x * 2
for i in range(5):
try:
result = expensive_call(i)
print(f" call {i}: result={result}")
except RateLimitExceeded as e:
print(f" call {i}: BLOCKED ({e})")
print("\n=== Multi-key isolation ===")
storage2 = MemoryStorage()
lim2 = MovingWindowRateLimiter(storage2)
limit2 = parse("3 per second")
users = ["alice", "bob", "alice", "alice", "bob", "alice"]
for user in users:
s = check_and_consume(lim2, limit2, user)
print(f" {user:6}: allowed={s.allowed} remaining={s.remaining}")
For the slowapi alternative — slowapi is a thin wrapper around limits specifically for FastAPI/Starlette that adds @limiter.limit("10/minute") decorators on route functions; if you’re building a FastAPI app, slowapi’s Limiter class is more ergonomic. Use limits directly when you need rate limiting outside of a web framework — for function decorators, CLI tools, background workers, or when building your own web framework middleware, which is what the make_fastapi_limiter() helper above does manually. For the ratelimit (ratelimit package) alternative — the ratelimit package provides a simple @limits(calls=10, period=60) decorator but is thread-unsafe and has no Redis backend; limits is the more complete solution with pluggable storage (Memory, Redis, Memcached, MongoDB), sliding/fixed/GCRA window algorithms, and is the backing library for popular packages like slowapi. The Claude Skills 360 bundle includes limits skill sets covering limits.parse() and parse_many(), MemoryStorage and RedisStorage backends, MovingWindowRateLimiter, check_and_consume() and check_without_consuming() helpers, QuotaStatus dataclass with retry_after, PerKeyLimiter multi-limit multi-key class, @rate_limited() function decorator, RateLimitExceeded exception, make_fastapi_limiter() FastAPI dependency, register_flask_limiter() Flask hook, and per-IP vs per-user key patterns. Start with the free tier to try rate limiting code generation.