← 返回首页
🧠

速率限制

📂 llm ⏱ 2 min 373 words

--- title: "速率限制" description: "LLM速率限制策略,包括令牌桶、滑动窗口等算法实现" tags: ["速率限制", "令牌桶", "滑动窗口", "流量控制", "防滥用"] category: "llm" icon: "🧠"

速率限制

速率限制是保护LLM服务免受滥用和过载的关键机制。合理的速率限制策略既能保障服务质量,又能控制成本。在LLM场景中,速率限制不仅需要考虑请求数量,还需要考虑Token消耗量。

核心算法

令牌桶算法

令牌桶以固定速率向桶中添加令牌,每个请求消耗一个令牌。桶有容量上限,超出的令牌会被丢弃。这种算法允许突发流量,同时限制平均速率。

import time
import threading
from collections import defaultdict

class TokenBucketRateLimiter:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.buckets = defaultdict(lambda: {"tokens": capacity, "last_refill": time.time()})
        self.lock = threading.Lock()

    def _refill(self, bucket: dict):
        now = time.time()
        elapsed = now - bucket["last_refill"]
        new_tokens = elapsed * self.refill_rate
        bucket["tokens"] = min(self.capacity, bucket["tokens"] + new_tokens)
        bucket["last_refill"] = now

    def allow_request(self, client_id: str) -> bool:
        with self.lock:
            bucket = self.buckets[client_id]
            self._refill(bucket)
            if bucket["tokens"] >= 1:
                bucket["tokens"] -= 1
                return True
            return False

    def get_wait_time(self, client_id: str) -> float:
        bucket = self.buckets[client_id]
        if bucket["tokens"] >= 1:
            return 0.0
        return (1 - bucket["tokens"]) / self.refill_rate

limiter = TokenBucketRateLimiter(capacity=10, refill_rate=2.0)
for i in range(15):
    allowed = limiter.allow_request("user-001")
    print(f"请求 {i+1}: {'✅ 通过' if allowed else '❌ 拒绝'}")

滑动窗口算法

滑动窗口记录每个时间窗口内的请求数量,避免固定窗口的边界突发问题。

import time
from collections import deque

class SlidingWindowRateLimiter:
    def __init__(self, window_size: int, max_requests: int):
        self.window_size = window_size
        self.max_requests = max_requests
        self.requests = defaultdict(deque)

    def allow_request(self, client_id: str) -> bool:
        now = time.time()
        window_start = now - self.window_size

        while self.requests[client_id] and self.requests[client_id][0] < window_start:
            self.requests[client_id].popleft()

        if len(self.requests[client_id]) < self.max_requests:
            self.requests[client_id].append(now)
            return True
        return False

    def get_remaining(self, client_id: str) -> int:
        now = time.time()
        window_start = now - self.window_size
        while self.requests[client_id] and self.requests[client_id][0] < window_start:
            self.requests[client_id].popleft()
        return max(0, self.max_requests - len(self.requests[client_id]))

limiter = SlidingWindowRateLimiter(window_size=60, max_requests=100)
for i in range(5):
    print(f"请求 {i+1}: {'✅' if limiter.allow_request('user-002') else '❌'}")
print(f"剩余配额: {limiter.get_remaining('user-002')}")

Token级别限流

LLM场景需要按Token消耗量限流,防止大量输出导致成本失控。

class TokenRateLimiter:
    def __init__(self, max_input_tokens: int, max_output_tokens: int, window_seconds: int):
        self.max_input = max_input_tokens
        self.max_output = max_output_tokens
        self.window = window_seconds
        self.usage = defaultdict(lambda: {"input": 0, "output": 0, "window_start": time.time()})

    def _check_window(self, client_id: str):
        now = time.time()
        if now - self.usage[client_id]["window_start"] > self.window:
            self.usage[client_id] = {"input": 0, "output": 0, "window_start": now}

    def check_and_consume(self, client_id: str, input_tokens: int, output_tokens: int) -> bool:
        self._check_window(client_id)
        usage = self.usage[client_id]
        if (usage["input"] + input_tokens > self.max_input or
            usage["output"] + output_tokens > self.max_output):
            return False
        usage["input"] += input_tokens
        usage["output"] += output_tokens
        return True

limiter = TokenRateLimiter(max_input_tokens=100000, max_output_tokens=50000, window_seconds=3600)
print(limiter.check_and_consume("user-003", 500, 1000))

FastAPI集成

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse

app = FastAPI()
limiter = TokenBucketRateLimiter(capacity=20, refill_rate=1.0)

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    client_id = request.headers.get("X-API-Key", request.client.host)
    if not limiter.allow_request(client_id):
        wait_time = limiter.get_wait_time(client_id)
        return JSONResponse(
            status_code=429,
            content={"error": "Rate limit exceeded", "retry_after": wait_time},
            headers={"Retry-After": str(int(wait_time))}
        )
    return await call_next(request)

策略选择建议

根据业务场景选择合适的限流策略:令牌桶适合允许突发的场景,滑动窗口适合平滑限流,Token级限流适合成本敏感场景。多维度限流(请求数+Token数+并发数)能提供更全面的保护。