速率限制
--- title: "速率限制" description: "LLM速率限制策略,包括令牌桶、滑动窗口等算法实现" tags: ["速率限制", "令牌桶", "滑动窗口", "流量控制", "防滥用"] category: "llm" icon: "🧠"
速率限制
速率限制是保护LLM服务免受滥用和过载的关键机制。合理的速率限制策略既能保障服务质量,又能控制成本。在LLM场景中,速率限制不仅需要考虑请求数量,还需要考虑Token消耗量。
核心算法
令牌桶算法
令牌桶以固定速率向桶中添加令牌,每个请求消耗一个令牌。桶有容量上限,超出的令牌会被丢弃。这种算法允许突发流量,同时限制平均速率。
import time
import threading
from collections import defaultdict
class TokenBucketRateLimiter:
def __init__(self, capacity: int, refill_rate: float):
self.capacity = capacity
self.refill_rate = refill_rate
self.buckets = defaultdict(lambda: {"tokens": capacity, "last_refill": time.time()})
self.lock = threading.Lock()
def _refill(self, bucket: dict):
now = time.time()
elapsed = now - bucket["last_refill"]
new_tokens = elapsed * self.refill_rate
bucket["tokens"] = min(self.capacity, bucket["tokens"] + new_tokens)
bucket["last_refill"] = now
def allow_request(self, client_id: str) -> bool:
with self.lock:
bucket = self.buckets[client_id]
self._refill(bucket)
if bucket["tokens"] >= 1:
bucket["tokens"] -= 1
return True
return False
def get_wait_time(self, client_id: str) -> float:
bucket = self.buckets[client_id]
if bucket["tokens"] >= 1:
return 0.0
return (1 - bucket["tokens"]) / self.refill_rate
limiter = TokenBucketRateLimiter(capacity=10, refill_rate=2.0)
for i in range(15):
allowed = limiter.allow_request("user-001")
print(f"请求 {i+1}: {'✅ 通过' if allowed else '❌ 拒绝'}")
滑动窗口算法
滑动窗口记录每个时间窗口内的请求数量,避免固定窗口的边界突发问题。
import time
from collections import deque
class SlidingWindowRateLimiter:
def __init__(self, window_size: int, max_requests: int):
self.window_size = window_size
self.max_requests = max_requests
self.requests = defaultdict(deque)
def allow_request(self, client_id: str) -> bool:
now = time.time()
window_start = now - self.window_size
while self.requests[client_id] and self.requests[client_id][0] < window_start:
self.requests[client_id].popleft()
if len(self.requests[client_id]) < self.max_requests:
self.requests[client_id].append(now)
return True
return False
def get_remaining(self, client_id: str) -> int:
now = time.time()
window_start = now - self.window_size
while self.requests[client_id] and self.requests[client_id][0] < window_start:
self.requests[client_id].popleft()
return max(0, self.max_requests - len(self.requests[client_id]))
limiter = SlidingWindowRateLimiter(window_size=60, max_requests=100)
for i in range(5):
print(f"请求 {i+1}: {'✅' if limiter.allow_request('user-002') else '❌'}")
print(f"剩余配额: {limiter.get_remaining('user-002')}")
Token级别限流
LLM场景需要按Token消耗量限流,防止大量输出导致成本失控。
class TokenRateLimiter:
def __init__(self, max_input_tokens: int, max_output_tokens: int, window_seconds: int):
self.max_input = max_input_tokens
self.max_output = max_output_tokens
self.window = window_seconds
self.usage = defaultdict(lambda: {"input": 0, "output": 0, "window_start": time.time()})
def _check_window(self, client_id: str):
now = time.time()
if now - self.usage[client_id]["window_start"] > self.window:
self.usage[client_id] = {"input": 0, "output": 0, "window_start": now}
def check_and_consume(self, client_id: str, input_tokens: int, output_tokens: int) -> bool:
self._check_window(client_id)
usage = self.usage[client_id]
if (usage["input"] + input_tokens > self.max_input or
usage["output"] + output_tokens > self.max_output):
return False
usage["input"] += input_tokens
usage["output"] += output_tokens
return True
limiter = TokenRateLimiter(max_input_tokens=100000, max_output_tokens=50000, window_seconds=3600)
print(limiter.check_and_consume("user-003", 500, 1000))
FastAPI集成
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
app = FastAPI()
limiter = TokenBucketRateLimiter(capacity=20, refill_rate=1.0)
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_id = request.headers.get("X-API-Key", request.client.host)
if not limiter.allow_request(client_id):
wait_time = limiter.get_wait_time(client_id)
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded", "retry_after": wait_time},
headers={"Retry-After": str(int(wait_time))}
)
return await call_next(request)
策略选择建议
根据业务场景选择合适的限流策略:令牌桶适合允许突发的场景,滑动窗口适合平滑限流,Token级限流适合成本敏感场景。多维度限流(请求数+Token数+并发数)能提供更全面的保护。