← 返回首页
🧠

高级速率限制

📂 llm ⏱ 4 min 604 words

--- title: "高级速率限制" description: "介绍LLM应用中的高级速率限制策略,包括滑动窗口、令牌桶等算法及其实现" tags: ["速率限制", "限流", "流量控制"] category: "llm" icon: "🧠"

高级速率限制

为什么需要速率限制

速率限制是保护LLM服务的关键机制。它能防止滥用、控制成本、确保服务质量(QoS),并避免单个用户耗尽共享资源。

速率限制算法

固定窗口计数器

最简单的实现,但存在边界突发问题。

import time
from collections import defaultdict

class FixedWindowLimiter:
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.windows = defaultdict(int)
    
    def is_allowed(self, key: str) -> bool:
        current_window = int(time.time() // self.window_seconds)
        window_key = f"{key}:{current_window}"
        
        if self.windows[window_key] >= self.max_requests:
            return False
        
        self.windows[window_key] += 1
        return True
    
    def get_remaining(self, key: str) -> int:
        current_window = int(time.time() // self.window_seconds)
        window_key = f"{key}:{current_window}"
        
        used = self.windows.get(window_key, 0)
        return max(0, self.max_requests - used)

滑动窗口日志

记录每个请求的时间戳,精确控制。

import time
from collections import defaultdict
from typing import List

class SlidingWindowLogLimiter:
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.request_logs: defaultdict[str, List[float]] = defaultdict(list)
    
    def is_allowed(self, key: str) -> bool:
        now = time.time()
        window_start = now - self.window_seconds
        
        # 清理过期记录
        self.request_logs[key] = [
            ts for ts in self.request_logs[key] if ts > window_start
        ]
        
        if len(self.request_logs[key]) >= self.max_requests:
            return False
        
        self.request_logs[key].append(now)
        return True
    
    def get_retry_after(self, key: str) -> float:
        if not self.request_logs[key]:
            return 0
        
        oldest = min(self.request_logs[key])
        return max(0, self.window_seconds - (time.time() - oldest))

滑动窗口计数器

结合固定窗口的优点,更平滑的限流。

import time
import math

class SlidingWindowCounterLimiter:
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.prev_count = defaultdict(int)
        self.curr_count = defaultdict(int)
        self.prev_window = 0
    
    def _current_window(self) -> int:
        return int(time.time() // self.window_seconds)
    
    def _update_windows(self):
        current = self._current_window()
        if current != self.prev_window:
            self.prev_window = current
            self.prev_count = self.curr_count.copy()
            self.curr_count = defaultdict(int)
    
    def is_allowed(self, key: str) -> bool:
        self._update_windows()
        
        current = self._current_window()
        elapsed_ratio = (time.time() % self.window_seconds) / self.window_seconds
        
        # 计算加权计数
        weighted_count = (
            self.prev_count[key] * (1 - elapsed_ratio) +
            self.curr_count[key]
        )
        
        if weighted_count >= self.max_requests:
            return False
        
        self.curr_count[key] += 1
        return True

令牌桶算法

允许突发流量,同时保持平均速率。

import time
import threading

class TokenBucketLimiter:
    def __init__(self, capacity: int, refill_rate: float):
        """
        capacity: 桶容量(最大令牌数)
        refill_rate: 每秒补充的令牌数
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = defaultdict(lambda: capacity)
        self.last_refill = defaultdict(time.time)
        self.lock = threading.Lock()
    
    def _refill(self, key: str):
        now = time.time()
        elapsed = now - self.last_refill[key]
        
        tokens_to_add = elapsed * self.refill_rate
        self.tokens[key] = min(
            self.capacity,
            self.tokens[key] + tokens_to_add
        )
        self.last_refill[key] = now
    
    def is_allowed(self, key: str, tokens_needed: int = 1) -> bool:
        with self.lock:
            self._refill(key)
            
            if self.tokens[key] >= tokens_needed:
                self.tokens[key] -= tokens_needed
                return True
            
            return False
    
    def get_tokens_remaining(self, key: str) -> float:
        with self.lock:
            self._refill(key)
            return self.tokens[key]

多层级速率限制

针对不同维度实现分层限流。

class MultiTierLimiter:
    def __init__(self):
        self.limiters = {}
    
    def add_tier(self, name: str, limiter):
        self.limiters[name] = limiter
    
    def is_allowed(self, key: str, **tier_kwargs) -> dict:
        results = {}
        allowed = True
        
        for name, limiter in self.limiters.items():
            tier_key = f"{key}:{name}"
            kwargs = tier_kwargs.get(name, {})
            
            is_ok = limiter.is_allowed(tier_key, **kwargs)
            results[name] = {
                "allowed": is_ok,
                "remaining": getattr(limiter, 'get_remaining', lambda k: -1)(tier_key)
            }
            
            if not is_ok:
                allowed = False
        
        return {"allowed": allowed, "tiers": results}

# 使用示例
limiter = MultiTierLimiter()

# 每分钟100次请求
limiter.add_tier("per_minute", FixedWindowLimiter(100, 60))

# 每小时1000次请求
limiter.add_tier("per_hour", FixedWindowLimiter(1000, 3600))

# 令牌桶:容量10,每秒补充2个
limiter.add_tier("burst", TokenBucketLimiter(10, 2))

基于用户的差异化限流

class UserTierLimiter:
    TIERS = {
        "free": {"per_minute": 10, "per_day": 100, "burst": 5},
        "pro": {"per_minute": 100, "per_day": 10000, "burst": 50},
        "enterprise": {"per_minute": 1000, "per_day": 100000, "burst": 200}
    }
    
    def __init__(self):
        self.limiters = {}
        for tier, limits in self.TIERS.items():
            self.limiters[tier] = {
                "per_minute": FixedWindowLimiter(limits["per_minute"], 60),
                "per_day": FixedWindowLimiter(limits["per_day"], 86400),
                "burst": TokenBucketLimiter(limits["burst"], limits["burst"] / 10)
            }
    
    def is_allowed(self, user_id: str, user_tier: str) -> dict:
        if user_tier not in self.limiters:
            return {"allowed": False, "error": "未知的用户层级"}
        
        results = {}
        for name, limiter in self.limiters[user_tier].items():
            results[name] = limiter.is_allowed(f"{user_id}:{name}")
        
        return {
            "allowed": all(results.values()),
            "details": results
        }

监控和告警

class RateLimitMonitor:
    def __init__(self):
        self.metrics = defaultdict(lambda: {"allowed": 0, "denied": 0})
    
    def record(self, key: str, allowed: bool):
        if allowed:
            self.metrics[key]["allowed"] += 1
        else:
            self.metrics[key]["denied"] += 1
    
    def get_stats(self, key: str) -> dict:
        stats = self.metrics[key]
        total = stats["allowed"] + stats["denied"]
        
        return {
            "total_requests": total,
            "allowed": stats["allowed"],
            "denied": stats["denied"],
            "denial_rate": stats["denied"] / total if total > 0 else 0
        }
    
    def check_alerts(self, key: str, threshold: float = 0.1) -> bool:
        stats = self.get_stats(key)
        return stats["denial_rate"] > threshold

高级速率限制策略能更精细地控制LLM API的使用,平衡用户体验和系统保护。