← 返回首页
🧠

队列管理

📂 llm ⏱ 2 min 246 words

--- title: "队列管理" description: "介绍LLM服务中的请求队列管理策略,包括队列设计、优先级调度和背压机制" tags: ["队列管理", "消息队列", "请求调度", "LLM服务"] category: "llm" icon: "🧠"

队列管理

队列在LLM服务中的作用

LLM推理请求通常需要较长的处理时间,直接同步处理会导致资源争用和请求超时。引入请求队列可以解耦请求接收与处理,提供缓冲能力,实现更精细的资源调度。

队列架构设计

基础队列结构

from queue import Queue
from dataclasses import dataclass
from typing import Optional
import asyncio

@dataclass
class LLMRequest:
    request_id: str
    prompt: str
    priority: int  # 0-越高越优先
    timeout: int   # 秒
    callback: Optional[callable] = None

class LLMRequestQueue:
    def __init__(self, max_size=1000):
        self.queue = asyncio.PriorityQueue(maxsize=max_size)
        self.active_requests = {}
    
    async def enqueue(self, request: LLMRequest):
        try:
            await self.queue.put((request.priority, request))
            return {"status": "queued", "position": self.queue.qsize()}
        except asyncio.QueueFull:
            return {"status": "rejected", "reason": "queue_full"}
    
    async def dequeue(self):
        priority, request = await self.queue.get()
        self.active_requests[request.request_id] = request
        return request

分级队列系统

根据业务重要性设置多个队列:

优先级调度策略

优先级定义

class Priority(Enum):
    CRITICAL = 0    # 关键业务请求
    HIGH = 1        # 用户交互请求
    NORMAL = 2      # 普通API调用
    LOW = 3         # 批量处理任务
    BACKGROUND = 4  # 后台维护任务

# 基于SLA的优先级计算
def calculate_priority(request):
    base_priority = request.user_tier_value
    
    # 时效性调整
    if request.deadline_urgent:
        base_priority -= 1
    
    # 负载调整
    if current_load > 80:
        base_priority += 1  # 高负载时降低非关键请求优先级
    
    return max(0, base_priority)

公平性保障

防止低优先级请求饿死:

class FairScheduler:
    def __init__(self):
        self.priority_counts = {p: 0 for p in Priority}
        self.max_per_priority = 100
    
    def admission_control(self, request):
        priority = request.priority
        if self.priority_counts[priority] >= self.max_per_priority:
            # 该优先级队列已满,尝试降级处理
            return self.try_alternative_queue(request)
        return True

背压机制

速率限制

当系统负载过高时,主动限制请求入队速率:

import time

class RateLimiter:
    def __init__(self, max_rate=100):
        self.max_rate = max_rate
        self.tokens = max_rate
        self.last_update = time.time()
    
    def allow_request(self):
        now = time.time()
        elapsed = now - self.last_update
        self.tokens = min(self.max_rate, self.tokens + elapsed * self.max_rate / 10)
        self.last_update = now
        
        if self.tokens >= 1:
            self.tokens -= 1
            return True
        return False

优雅降级

负载超过阈值时的处理策略:

监控与告警

关键监控指标

告警规则

当队列深度超过阈值或平均等待时间过长时,触发告警通知运维团队。设置多级告警,从预警到严重逐步升级处理措施。