队列管理
--- title: "队列管理" description: "介绍LLM服务中的请求队列管理策略,包括队列设计、优先级调度和背压机制" tags: ["队列管理", "消息队列", "请求调度", "LLM服务"] category: "llm" icon: "🧠"
队列管理
队列在LLM服务中的作用
LLM推理请求通常需要较长的处理时间,直接同步处理会导致资源争用和请求超时。引入请求队列可以解耦请求接收与处理,提供缓冲能力,实现更精细的资源调度。
队列架构设计
基础队列结构
from queue import Queue
from dataclasses import dataclass
from typing import Optional
import asyncio
@dataclass
class LLMRequest:
request_id: str
prompt: str
priority: int # 0-越高越优先
timeout: int # 秒
callback: Optional[callable] = None
class LLMRequestQueue:
def __init__(self, max_size=1000):
self.queue = asyncio.PriorityQueue(maxsize=max_size)
self.active_requests = {}
async def enqueue(self, request: LLMRequest):
try:
await self.queue.put((request.priority, request))
return {"status": "queued", "position": self.queue.qsize()}
except asyncio.QueueFull:
return {"status": "rejected", "reason": "queue_full"}
async def dequeue(self):
priority, request = await self.queue.get()
self.active_requests[request.request_id] = request
return request
分级队列系统
根据业务重要性设置多个队列:
- 实时队列:用于在线对话等低延迟场景
- 异步队列:用于批量处理和离线任务
- 后台队列:用于模型微调等长时间任务
优先级调度策略
优先级定义
class Priority(Enum):
CRITICAL = 0 # 关键业务请求
HIGH = 1 # 用户交互请求
NORMAL = 2 # 普通API调用
LOW = 3 # 批量处理任务
BACKGROUND = 4 # 后台维护任务
# 基于SLA的优先级计算
def calculate_priority(request):
base_priority = request.user_tier_value
# 时效性调整
if request.deadline_urgent:
base_priority -= 1
# 负载调整
if current_load > 80:
base_priority += 1 # 高负载时降低非关键请求优先级
return max(0, base_priority)
公平性保障
防止低优先级请求饿死:
class FairScheduler:
def __init__(self):
self.priority_counts = {p: 0 for p in Priority}
self.max_per_priority = 100
def admission_control(self, request):
priority = request.priority
if self.priority_counts[priority] >= self.max_per_priority:
# 该优先级队列已满,尝试降级处理
return self.try_alternative_queue(request)
return True
背压机制
速率限制
当系统负载过高时,主动限制请求入队速率:
import time
class RateLimiter:
def __init__(self, max_rate=100):
self.max_rate = max_rate
self.tokens = max_rate
self.last_update = time.time()
def allow_request(self):
now = time.time()
elapsed = now - self.last_update
self.tokens = min(self.max_rate, self.tokens + elapsed * self.max_rate / 10)
self.last_update = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
优雅降级
负载超过阈值时的处理策略:
- 返回排队位置:告知用户预计等待时间
- 建议重试时间:引导用户稍后重试
- 降级服务:使用更小更快的模型处理简单请求
监控与告警
关键监控指标
- 队列深度:反映积压程度
- 平均等待时间:用户感知的延迟
- 队列处理速率:系统处理能力
- 被拒绝请求比例:服务可用性
告警规则
当队列深度超过阈值或平均等待时间过长时,触发告警通知运维团队。设置多级告警,从预警到严重逐步升级处理措施。