LLM请求限流
--- title: "LLM请求限流" description: "详细介绍LLM应用中的请求限流技术,包括限流策略、队列管理和降级方案" tags: ["限流", "流量控制", "服务质量"] category: "llm" icon: "🧠"
LLM请求限流
限流与速率限制的区别
速率限制(Rate Limiting)是拒绝超出限制的请求,而限流(Throttling)则是延迟处理超出容量的请求。限流更友好,它不会直接拒绝请求,而是让请求排队等待处理。
限流的核心概念
请求优先级
不同类型的请求应有不同的优先级。
from enum import IntEnum
from dataclasses import dataclass
from typing import Any
import time
class RequestPriority(IntEnum):
CRITICAL = 0 # 紧急:实时对话、关键任务
HIGH = 1 # 高:用户交互、API调用
NORMAL = 2 # 普通:批量处理、后台任务
LOW = 3 # 低:数据分析、非实时任务
@dataclass
class LLMRequest:
id: str
priority: RequestPriority
payload: dict
created_at: float = None
user_id: str = None
def __post_init__(self):
if self.created_at is None:
self.created_at = time.time()
优先级队列
import heapq
import threading
from collections import defaultdict
class PriorityThrottleQueue:
def __init__(self, max_size: int = 1000):
self.max_size = max_size
self.queue = []
self.counter = 0
self.lock = threading.Lock()
def enqueue(self, request: LLMRequest) -> bool:
with self.lock:
if len(self.queue) >= self.max_size:
return False
# 使用计数器确保相同优先级按FIFO排序
entry = (request.priority, self.counter, request)
heapq.heappush(self.queue, entry)
self.counter += 1
return True
def dequeue(self) -> LLMRequest:
with self.lock:
if not self.queue:
return None
_, _, request = heapq.heappop(self.queue)
return request
def peek(self) -> LLMRequest:
with self.lock:
if not self.queue:
return None
return self.queue[0][2]
def size(self) -> int:
with self.lock:
return len(self.queue)
令牌桶限流器
允许突发流量,同时控制平均速率。
import time
import threading
class TokenBucketThrottler:
def __init__(self, capacity: int, refill_rate: float):
"""
capacity: 令牌桶容量
refill_rate: 每秒补充的令牌数
"""
self.capacity = capacity
self.refill_rate = refill_rate
self.tokens = capacity
self.last_refill = time.time()
self.lock = threading.Lock()
def _refill(self):
now = time.time()
elapsed = now - self.last_refill
tokens_to_add = elapsed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
def acquire(self, tokens: int = 1, wait: bool = True,
timeout: float = None) -> bool:
start_time = time.time()
while True:
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
if not wait:
return False
if timeout and (time.time() - start_time) >= timeout:
return False
# 计算等待时间
wait_time = tokens / self.refill_rate
time.sleep(min(wait_time, 0.1))
滑动窗口限流
import time
from collections import deque
class SlidingWindowThrottler:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
self.lock = threading.Lock()
def _cleanup(self):
cutoff = time.time() - self.window_seconds
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
def try_acquire(self) -> bool:
with self.lock:
self._cleanup()
if len(self.requests) < self.max_requests:
self.requests.append(time.time())
return True
return False
def wait_and_acquire(self, timeout: float = None) -> bool:
start_time = time.time()
while True:
if self.try_acquire():
return True
if timeout and (time.time() - start_time) >= timeout:
return False
# 计算需要等待的时间
if self.requests:
wait_until = self.requests[0] + self.window_seconds
wait_time = wait_until - time.time()
time.sleep(min(max(wait_time, 0.01), 0.1))
else:
time.sleep(0.01)
自适应限流
根据系统负载动态调整限流参数。
import time
from collections import deque
class AdaptiveThrottler:
def __init__(self, initial_rate: int, min_rate: int, max_rate: int,
window_seconds: int = 60):
self.current_rate = initial_rate
self.min_rate = min_rate
self.max_rate = max_rate
self.window_seconds = window_seconds
self.response_times = deque()
self.error_counts = deque()
self.last_adjustment = time.time()
def record_response(self, response_time: float, is_error: bool):
now = time.time()
cutoff = now - self.window_seconds
self.response_times.append((now, response_time))
self.error_counts.append((now, 1 if is_error else 0))
# 清理过期记录
while self.response_times and self.response_times[0][0] < cutoff:
self.response_times.popleft()
while self.error_counts and self.error_counts[0][0] < cutoff:
self.error_counts.popleft()
# 定期调整速率
if now - self.last_adjustment >= 10:
self._adjust_rate()
self.last_adjustment = now
def _adjust_rate(self):
if not self.response_times:
return
# 计算平均响应时间
avg_response_time = sum(
rt for _, rt in self.response_times
) / len(self.response_times)
# 计算错误率
error_rate = sum(
err for _, err in self.error_counts
) / len(self.error_counts) if self.error_counts else 0
# 根据指标调整速率
if avg_response_time > 2000 or error_rate > 0.1:
# 系统压力大,降低速率
self.current_rate = max(
self.min_rate,
int(self.current_rate * 0.8)
)
elif avg_response_time < 500 and error_rate < 0.01:
# 系统压力小,提高速率
self.current_rate = min(
self.max_rate,
int(self.current_rate * 1.2)
)
def is_allowed(self) -> bool:
# 使用当前速率进行限流
return len(self.response_times) < self.current_rate
降级策略
当系统过载时,自动降级处理。
class DegradationManager:
def __init__(self):
self.degradation_level = 0 # 0:正常, 1:轻度, 2:中度, 3:严重
self.metrics = {"cpu": 0, "memory": 0, "queue_size": 0}
def update_metrics(self, cpu: float, memory: float, queue_size: int):
self.metrics = {
"cpu": cpu,
"memory": memory,
"queue_size": queue_size
}
self._assess_degradation_level()
def _assess_degradation_level(self):
if self.metrics["cpu"] > 90 or self.metrics["memory"] > 90:
self.degradation_level = 3
elif self.metrics["cpu"] > 70 or self.metrics["memory"] > 70:
self.degradation_level = 2
elif self.metrics["cpu"] > 50 or self.metrics["memory"] > 50:
self.degradation_level = 1
else:
self.degradation_level = 0
def should_accept_request(self, request: LLMRequest) -> bool:
if self.degradation_level == 0:
return True
elif self.degradation_level == 1:
return request.priority <= RequestPriority.HIGH
elif self.degradation_level == 2:
return request.priority <= RequestPriority.CRITICAL
else:
return False
def get_degraded_response(self, request: LLMRequest) -> dict:
if self.degradation_level >= 2:
return {
"error": "service_degraded",
"message": "系统当前负载较高,请稍后重试",
"retry_after": 30
}
return None
监控和告警
class ThrottleMonitor:
def __init__(self):
self.metrics = {
"total_requests": 0,
"throttled_requests": 0,
"avg_wait_time": 0,
"queue_size": 0
}
self.wait_times = []
def record_request(self, throttled: bool, wait_time: float = 0):
self.metrics["total_requests"] += 1
if throttled:
self.metrics["throttled_requests"] += 1
self.wait_times.append(wait_time)
if len(self.wait_times) > 1000:
self.wait_times = self.wait_times[-500:]
if self.wait_times:
self.metrics["avg_wait_time"] = sum(self.wait_times) / len(self.wait_times)
def get_throttle_rate(self) -> float:
if self.metrics["total_requests"] == 0:
return 0
return self.metrics["throttled_requests"] / self.metrics["total_requests"]
def should_alert(self, threshold: float = 0.3) -> bool:
return self.get_throttle_rate() > threshold
限流是保护LLM服务的重要手段,通过合理的限流策略可以确保系统稳定运行,同时为用户提供可预期的服务体验。