← 返回首页
🧠

LLM请求限流

📂 llm ⏱ 4 min 738 words

--- title: "LLM请求限流" description: "详细介绍LLM应用中的请求限流技术,包括限流策略、队列管理和降级方案" tags: ["限流", "流量控制", "服务质量"] category: "llm" icon: "🧠"

LLM请求限流

限流与速率限制的区别

速率限制(Rate Limiting)是拒绝超出限制的请求,而限流(Throttling)则是延迟处理超出容量的请求。限流更友好,它不会直接拒绝请求,而是让请求排队等待处理。

限流的核心概念

请求优先级

不同类型的请求应有不同的优先级。

from enum import IntEnum
from dataclasses import dataclass
from typing import Any
import time

class RequestPriority(IntEnum):
    CRITICAL = 0    # 紧急:实时对话、关键任务
    HIGH = 1        # 高:用户交互、API调用
    NORMAL = 2      # 普通:批量处理、后台任务
    LOW = 3         # 低:数据分析、非实时任务

@dataclass
class LLMRequest:
    id: str
    priority: RequestPriority
    payload: dict
    created_at: float = None
    user_id: str = None
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = time.time()

优先级队列

import heapq
import threading
from collections import defaultdict

class PriorityThrottleQueue:
    def __init__(self, max_size: int = 1000):
        self.max_size = max_size
        self.queue = []
        self.counter = 0
        self.lock = threading.Lock()
    
    def enqueue(self, request: LLMRequest) -> bool:
        with self.lock:
            if len(self.queue) >= self.max_size:
                return False
            
            # 使用计数器确保相同优先级按FIFO排序
            entry = (request.priority, self.counter, request)
            heapq.heappush(self.queue, entry)
            self.counter += 1
            return True
    
    def dequeue(self) -> LLMRequest:
        with self.lock:
            if not self.queue:
                return None
            
            _, _, request = heapq.heappop(self.queue)
            return request
    
    def peek(self) -> LLMRequest:
        with self.lock:
            if not self.queue:
                return None
            return self.queue[0][2]
    
    def size(self) -> int:
        with self.lock:
            return len(self.queue)

令牌桶限流器

允许突发流量,同时控制平均速率。

import time
import threading

class TokenBucketThrottler:
    def __init__(self, capacity: int, refill_rate: float):
        """
        capacity: 令牌桶容量
        refill_rate: 每秒补充的令牌数
        """
        self.capacity = capacity
        self.refill_rate = refill_rate
        self.tokens = capacity
        self.last_refill = time.time()
        self.lock = threading.Lock()
    
    def _refill(self):
        now = time.time()
        elapsed = now - self.last_refill
        
        tokens_to_add = elapsed * self.refill_rate
        self.tokens = min(self.capacity, self.tokens + tokens_to_add)
        self.last_refill = now
    
    def acquire(self, tokens: int = 1, wait: bool = True, 
                timeout: float = None) -> bool:
        start_time = time.time()
        
        while True:
            with self.lock:
                self._refill()
                
                if self.tokens >= tokens:
                    self.tokens -= tokens
                    return True
            
            if not wait:
                return False
            
            if timeout and (time.time() - start_time) >= timeout:
                return False
            
            # 计算等待时间
            wait_time = tokens / self.refill_rate
            time.sleep(min(wait_time, 0.1))

滑动窗口限流

import time
from collections import deque

class SlidingWindowThrottler:
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = deque()
        self.lock = threading.Lock()
    
    def _cleanup(self):
        cutoff = time.time() - self.window_seconds
        while self.requests and self.requests[0] < cutoff:
            self.requests.popleft()
    
    def try_acquire(self) -> bool:
        with self.lock:
            self._cleanup()
            
            if len(self.requests) < self.max_requests:
                self.requests.append(time.time())
                return True
            
            return False
    
    def wait_and_acquire(self, timeout: float = None) -> bool:
        start_time = time.time()
        
        while True:
            if self.try_acquire():
                return True
            
            if timeout and (time.time() - start_time) >= timeout:
                return False
            
            # 计算需要等待的时间
            if self.requests:
                wait_until = self.requests[0] + self.window_seconds
                wait_time = wait_until - time.time()
                time.sleep(min(max(wait_time, 0.01), 0.1))
            else:
                time.sleep(0.01)

自适应限流

根据系统负载动态调整限流参数。

import time
from collections import deque

class AdaptiveThrottler:
    def __init__(self, initial_rate: int, min_rate: int, max_rate: int,
                 window_seconds: int = 60):
        self.current_rate = initial_rate
        self.min_rate = min_rate
        self.max_rate = max_rate
        self.window_seconds = window_seconds
        
        self.response_times = deque()
        self.error_counts = deque()
        self.last_adjustment = time.time()
    
    def record_response(self, response_time: float, is_error: bool):
        now = time.time()
        cutoff = now - self.window_seconds
        
        self.response_times.append((now, response_time))
        self.error_counts.append((now, 1 if is_error else 0))
        
        # 清理过期记录
        while self.response_times and self.response_times[0][0] < cutoff:
            self.response_times.popleft()
        while self.error_counts and self.error_counts[0][0] < cutoff:
            self.error_counts.popleft()
        
        # 定期调整速率
        if now - self.last_adjustment >= 10:
            self._adjust_rate()
            self.last_adjustment = now
    
    def _adjust_rate(self):
        if not self.response_times:
            return
        
        # 计算平均响应时间
        avg_response_time = sum(
            rt for _, rt in self.response_times
        ) / len(self.response_times)
        
        # 计算错误率
        error_rate = sum(
            err for _, err in self.error_counts
        ) / len(self.error_counts) if self.error_counts else 0
        
        # 根据指标调整速率
        if avg_response_time > 2000 or error_rate > 0.1:
            # 系统压力大,降低速率
            self.current_rate = max(
                self.min_rate,
                int(self.current_rate * 0.8)
            )
        elif avg_response_time < 500 and error_rate < 0.01:
            # 系统压力小,提高速率
            self.current_rate = min(
                self.max_rate,
                int(self.current_rate * 1.2)
            )
    
    def is_allowed(self) -> bool:
        # 使用当前速率进行限流
        return len(self.response_times) < self.current_rate

降级策略

当系统过载时,自动降级处理。

class DegradationManager:
    def __init__(self):
        self.degradation_level = 0  # 0:正常, 1:轻度, 2:中度, 3:严重
        self.metrics = {"cpu": 0, "memory": 0, "queue_size": 0}
    
    def update_metrics(self, cpu: float, memory: float, queue_size: int):
        self.metrics = {
            "cpu": cpu,
            "memory": memory,
            "queue_size": queue_size
        }
        self._assess_degradation_level()
    
    def _assess_degradation_level(self):
        if self.metrics["cpu"] > 90 or self.metrics["memory"] > 90:
            self.degradation_level = 3
        elif self.metrics["cpu"] > 70 or self.metrics["memory"] > 70:
            self.degradation_level = 2
        elif self.metrics["cpu"] > 50 or self.metrics["memory"] > 50:
            self.degradation_level = 1
        else:
            self.degradation_level = 0
    
    def should_accept_request(self, request: LLMRequest) -> bool:
        if self.degradation_level == 0:
            return True
        elif self.degradation_level == 1:
            return request.priority <= RequestPriority.HIGH
        elif self.degradation_level == 2:
            return request.priority <= RequestPriority.CRITICAL
        else:
            return False
    
    def get_degraded_response(self, request: LLMRequest) -> dict:
        if self.degradation_level >= 2:
            return {
                "error": "service_degraded",
                "message": "系统当前负载较高,请稍后重试",
                "retry_after": 30
            }
        return None

监控和告警

class ThrottleMonitor:
    def __init__(self):
        self.metrics = {
            "total_requests": 0,
            "throttled_requests": 0,
            "avg_wait_time": 0,
            "queue_size": 0
        }
        self.wait_times = []
    
    def record_request(self, throttled: bool, wait_time: float = 0):
        self.metrics["total_requests"] += 1
        
        if throttled:
            self.metrics["throttled_requests"] += 1
            self.wait_times.append(wait_time)
            
            if len(self.wait_times) > 1000:
                self.wait_times = self.wait_times[-500:]
        
        if self.wait_times:
            self.metrics["avg_wait_time"] = sum(self.wait_times) / len(self.wait_times)
    
    def get_throttle_rate(self) -> float:
        if self.metrics["total_requests"] == 0:
            return 0
        return self.metrics["throttled_requests"] / self.metrics["total_requests"]
    
    def should_alert(self, threshold: float = 0.3) -> bool:
        return self.get_throttle_rate() > threshold

限流是保护LLM服务的重要手段,通过合理的限流策略可以确保系统稳定运行,同时为用户提供可预期的服务体验。