高级速率限制
--- title: "高级速率限制" description: "介绍LLM应用中的高级速率限制策略,包括滑动窗口、令牌桶等算法及其实现" tags: ["速率限制", "限流", "流量控制"] category: "llm" icon: "🧠"
高级速率限制
为什么需要速率限制
速率限制是保护LLM服务的关键机制。它能防止滥用、控制成本、确保服务质量(QoS),并避免单个用户耗尽共享资源。
速率限制算法
固定窗口计数器
最简单的实现,但存在边界突发问题。
import time
from collections import defaultdict
class FixedWindowLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.windows = defaultdict(int)
def is_allowed(self, key: str) -> bool:
current_window = int(time.time() // self.window_seconds)
window_key = f"{key}:{current_window}"
if self.windows[window_key] >= self.max_requests:
return False
self.windows[window_key] += 1
return True
def get_remaining(self, key: str) -> int:
current_window = int(time.time() // self.window_seconds)
window_key = f"{key}:{current_window}"
used = self.windows.get(window_key, 0)
return max(0, self.max_requests - used)
滑动窗口日志
记录每个请求的时间戳,精确控制。
import time
from collections import defaultdict
from typing import List
class SlidingWindowLogLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.request_logs: defaultdict[str, List[float]] = defaultdict(list)
def is_allowed(self, key: str) -> bool:
now = time.time()
window_start = now - self.window_seconds
# 清理过期记录
self.request_logs[key] = [
ts for ts in self.request_logs[key] if ts > window_start
]
if len(self.request_logs[key]) >= self.max_requests:
return False
self.request_logs[key].append(now)
return True
def get_retry_after(self, key: str) -> float:
if not self.request_logs[key]:
return 0
oldest = min(self.request_logs[key])
return max(0, self.window_seconds - (time.time() - oldest))
滑动窗口计数器
结合固定窗口的优点,更平滑的限流。
import time
import math
class SlidingWindowCounterLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.prev_count = defaultdict(int)
self.curr_count = defaultdict(int)
self.prev_window = 0
def _current_window(self) -> int:
return int(time.time() // self.window_seconds)
def _update_windows(self):
current = self._current_window()
if current != self.prev_window:
self.prev_window = current
self.prev_count = self.curr_count.copy()
self.curr_count = defaultdict(int)
def is_allowed(self, key: str) -> bool:
self._update_windows()
current = self._current_window()
elapsed_ratio = (time.time() % self.window_seconds) / self.window_seconds
# 计算加权计数
weighted_count = (
self.prev_count[key] * (1 - elapsed_ratio) +
self.curr_count[key]
)
if weighted_count >= self.max_requests:
return False
self.curr_count[key] += 1
return True
令牌桶算法
允许突发流量,同时保持平均速率。
import time
import threading
class TokenBucketLimiter:
def __init__(self, capacity: int, refill_rate: float):
"""
capacity: 桶容量(最大令牌数)
refill_rate: 每秒补充的令牌数
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = defaultdict(lambda: capacity)
self.last_refill = defaultdict(time.time)
self.lock = threading.Lock()
def _refill(self, key: str):
now = time.time()
elapsed = now - self.last_refill[key]
tokens_to_add = elapsed * self.refill_rate
self.tokens[key] = min(
self.capacity,
self.tokens[key] + tokens_to_add
)
self.last_refill[key] = now
def is_allowed(self, key: str, tokens_needed: int = 1) -> bool:
with self.lock:
self._refill(key)
if self.tokens[key] >= tokens_needed:
self.tokens[key] -= tokens_needed
return True
return False
def get_tokens_remaining(self, key: str) -> float:
with self.lock:
self._refill(key)
return self.tokens[key]
多层级速率限制
针对不同维度实现分层限流。
class MultiTierLimiter:
def __init__(self):
self.limiters = {}
def add_tier(self, name: str, limiter):
self.limiters[name] = limiter
def is_allowed(self, key: str, **tier_kwargs) -> dict:
results = {}
allowed = True
for name, limiter in self.limiters.items():
tier_key = f"{key}:{name}"
kwargs = tier_kwargs.get(name, {})
is_ok = limiter.is_allowed(tier_key, **kwargs)
results[name] = {
"allowed": is_ok,
"remaining": getattr(limiter, 'get_remaining', lambda k: -1)(tier_key)
}
if not is_ok:
allowed = False
return {"allowed": allowed, "tiers": results}
# 使用示例
limiter = MultiTierLimiter()
# 每分钟100次请求
limiter.add_tier("per_minute", FixedWindowLimiter(100, 60))
# 每小时1000次请求
limiter.add_tier("per_hour", FixedWindowLimiter(1000, 3600))
# 令牌桶:容量10,每秒补充2个
limiter.add_tier("burst", TokenBucketLimiter(10, 2))
基于用户的差异化限流
class UserTierLimiter:
TIERS = {
"free": {"per_minute": 10, "per_day": 100, "burst": 5},
"pro": {"per_minute": 100, "per_day": 10000, "burst": 50},
"enterprise": {"per_minute": 1000, "per_day": 100000, "burst": 200}
}
def __init__(self):
self.limiters = {}
for tier, limits in self.TIERS.items():
self.limiters[tier] = {
"per_minute": FixedWindowLimiter(limits["per_minute"], 60),
"per_day": FixedWindowLimiter(limits["per_day"], 86400),
"burst": TokenBucketLimiter(limits["burst"], limits["burst"] / 10)
}
def is_allowed(self, user_id: str, user_tier: str) -> dict:
if user_tier not in self.limiters:
return {"allowed": False, "error": "未知的用户层级"}
results = {}
for name, limiter in self.limiters[user_tier].items():
results[name] = limiter.is_allowed(f"{user_id}:{name}")
return {
"allowed": all(results.values()),
"details": results
}
监控和告警
class RateLimitMonitor:
def __init__(self):
self.metrics = defaultdict(lambda: {"allowed": 0, "denied": 0})
def record(self, key: str, allowed: bool):
if allowed:
self.metrics[key]["allowed"] += 1
else:
self.metrics[key]["denied"] += 1
def get_stats(self, key: str) -> dict:
stats = self.metrics[key]
total = stats["allowed"] + stats["denied"]
return {
"total_requests": total,
"allowed": stats["allowed"],
"denied": stats["denied"],
"denial_rate": stats["denied"] / total if total > 0 else 0
}
def check_alerts(self, key: str, threshold: float = 0.1) -> bool:
stats = self.get_stats(key)
return stats["denial_rate"] > threshold
高级速率限制策略能更精细地控制LLM API的使用,平衡用户体验和系统保护。