← 返回首页
🧠

吞吐量优化:提升LLM系统处理能力的方法

📂 llm ⏱ 5 min 835 words

吞吐量优化:提升LLM系统处理能力的方法

吞吐量指标定义

核心指标

from dataclasses import dataclass
from typing import List
import time

@dataclass
class ThroughputMetrics:
    requests_per_second: float  # RPS: 每秒请求数
    tokens_per_second: float  # TPS: 每秒token数
    batch_efficiency: float  # 批处理效率
    gpu_utilization: float  # GPU利用率
    concurrent_users: float  # 并发用户数
    
    @classmethod
    def measure(cls, handler, requests: List, duration: float = 60.0) -> 'ThroughputMetrics':
        """测量吞吐量指标"""
        start_time = time.time()
        completed = 0
        total_tokens = 0
        
        while time.time() - start_time < duration:
            for req in requests:
                result = handler.process(req)
                completed += 1
                total_tokens += result['tokens_generated']
        
        elapsed = time.time() - start_time
        
        return cls(
            requests_per_second=completed / elapsed,
            tokens_per_second=total_tokens / elapsed,
            batch_efficiency=0.0,  # 需要额外测量
            gpu_utilization=0.0,  # 需要系统监控
            concurrent_users=len(requests)
        )

批处理优化

动态批处理

import asyncio
from collections import deque
from typing import Optional

class DynamicBatcher:
    def __init__(self, max_batch_size: int = 32, max_wait_time: float = 0.1):
        self.max_batch_size = max_batch_size
        self.max_wait_time = max_wait_time
        self.pending_queue = deque()
        self.batch = []
        self.lock = asyncio.Lock()
    
    async def add_request(self, request) -> asyncio.Future:
        """添加请求到批处理器"""
        future = asyncio.get_event_loop().create_future()
        
        async with self.lock:
            self.pending_queue.append({
                'request': request,
                'future': future,
                'timestamp': time.time()
            })
            
            # 检查是否应该形成批次
            if self.should_form_batch():
                await self.form_batch()
        
        return future
    
    def should_form_batch(self) -> bool:
        """判断是否应该形成批次"""
        if len(self.pending_queue) >= self.max_batch_size:
            return True
        
        if self.pending_queue:
            oldest = self.pending_queue[0]
            if time.time() - oldest['timestamp'] >= self.max_wait_time:
                return True
        
        return False
    
    async def form_batch(self):
        """形成批次并处理"""
        batch_items = []
        
        while self.pending_queue and len(batch_items) < self.max_batch_size:
            item = self.pending_queue.popleft()
            batch_items.append(item)
        
        if batch_items:
            # 批量处理
            inputs = [item['request']['input'] for item in batch_items]
            outputs = await self.process_batch(inputs)
            
            # 设置结果
            for item, output in zip(batch_items, outputs):
                item['future'].set_result(output)

推测解码

class SpeculativeDecoder:
    def __init__(self, draft_model, target_model, gamma: int = 5):
        self.draft_model = draft_model  # 小模型
        self.target_model = target_model  # 大模型
        self.gamma = gamma  # 推测token数
    
    async def generate(self, prompt: str) -> str:
        """使用推测解码生成"""
        tokens = self.tokenize(prompt)
        
        while not self.is_finished(tokens):
            # 1. 用小模型快速生成gamma个token
            draft_tokens = []
            draft_probs = []
            
            current = tokens
            for _ in range(self.gamma):
                prob = self.draft_model.get_probs(current)
                next_token = self.sample(prob)
                draft_tokens.append(next_token)
                draft_probs.append(prob[next_token])
                current = current + [next_token]
            
            # 2. 用大模型验证
            target_probs = self.target_model.get_probs_batch(
                tokens, draft_tokens
            )
            
            # 3. 接受/拒绝采样
            accepted = 0
            for i in range(self.gamma):
                draft_prob = draft_probs[i]
                target_prob = target_probs[i][draft_tokens[i]]
                
                # 接受概率
                accept_prob = min(1.0, target_prob / draft_prob)
                
                if random.random() < accept_prob:
                    tokens.append(draft_tokens[i])
                    accepted += 1
                else:
                    # 从修正分布采样
                    corrected = self.correct_distribution(
                        target_probs[i], draft_probs[i]
                    )
                    tokens.append(self.sample(corrected))
                    break
        
        return self.detokenize(tokens)

并行化策略

张量并行

import torch
import torch.distributed as dist

class TensorParallel:
    def __init__(self, model, world_size: int):
        self.model = model
        self.world_size = world_size
        
        # 初始化进程组
        dist.init_process_group(
            backend='nccl',
            world_size=world_size
        )
        
        self.rank = dist.get_rank()
        self.local_rank = self.rank % torch.cuda.device_count()
    
    def parallel_forward(self, input_ids):
        """并行前向传播"""
        # 分割输入
        chunks = torch.chunk(input_ids, self.world_size, dim=1)
        local_input = chunks[self.rank].to(self.local_rank)
        
        # 本地计算
        local_output = self.model(local_input)
        
        # 收集结果
        outputs = [torch.zeros_like(local_output) for _ in range(self.world_size)]
        dist.all_gather(outputs, local_output)
        
        return torch.cat(outputs, dim=1)

流水线并行

class PipelineParallel:
    def __init__(self, model_layers, num_stages: int):
        self.num_stages = num_stages
        self.stages = self.distribute_layers(model_layers)
    
    def distribute_layers(self, layers):
        """将层分配到不同阶段"""
        layers_per_stage = len(layers) // self.num_stages
        stages = []
        
        for i in range(self.num_stages):
            start = i * layers_per_stage
            end = start + layers_per_stage if i < self.num_stages - 1 else len(layers)
            stages.append(layers[start:end])
        
        return stages
    
    async def pipeline_forward(self, input_batches):
        """流水线前向传播"""
        # 微批次处理
        micro_batches = self.split_into_micro_batches(input_batches)
        
        # 流水线调度
        schedule = self.create_schedule(len(micro_batches))
        
        outputs = []
        for step in schedule:
            stage_outputs = []
            for stage_idx, micro_batch_idx in enumerate(step):
                if micro_batch_idx is not None:
                    output = await self.process_micro_batch(
                        stage_idx, 
                        micro_batches[micro_batch_idx]
                    )
                    stage_outputs.append(output)
            
            outputs.extend(stage_outputs)
        
        return self.merge_outputs(outputs)

资源调度

智能负载均衡

class LLMRouter:
    def __init__(self, instances: List):
        self.instances = instances
        self.health_checker = HealthChecker()
        self.metrics_collector = MetricsCollector()
    
    async def route_request(self, request) -> dict:
        """路由请求到最佳实例"""
        # 获取实例状态
        available_instances = await self.get_healthy_instances()
        
        if not available_instances:
            raise NoAvailableInstanceError()
        
        # 计算每个实例的分数
        scores = {}
        for instance in available_instances:
            score = await self.calculate_instance_score(instance, request)
            scores[instance['id']] = score
        
        # 选择最佳实例
        best_instance_id = max(scores, key=scores.get)
        
        return await self.send_to_instance(best_instance_id, request)
    
    async def calculate_instance_score(self, instance, request) -> float:
        """计算实例分数"""
        metrics = await self.metrics_collector.get_metrics(instance['id'])
        
        # 综合考虑多个因素
        score = (
            (1 - metrics['gpu_utilization']) * 0.3 +  # GPU空闲度
            (1 / (metrics['queue_length'] + 1)) * 0.3 +  # 队列长度
            (1 / (metrics['avg_latency'] + 0.1)) * 0.2 +  # 延迟
            metrics['availability'] * 0.2  # 可用性
        )
        
        return score
    
    async def get_healthy_instances(self) -> List:
        """获取健康实例"""
        healthy = []
        
        for instance in self.instances:
            if await self.health_checker.is_healthy(instance):
                healthy.append(instance)
        
        return healthy

自动扩缩容

class AutoScaler:
    def __init__(self, min_instances: int = 2, max_instances: int = 10):
        self.min_instances = min_instances
        self.max_instances = max_instances
        self.current_instances = min_instances
        self.scaling_history = []
    
    async def evaluate_scaling(self, metrics: dict):
        """评估是否需要扩缩容"""
        current_load = metrics['requests_per_second']
        target_utilization = 0.7  # 目标利用率
        
        # 计算所需实例数
        required_instances = self.calculate_required_instances(
            current_load, target_utilization
        )
        
        # 限制范围
        required_instances = max(
            self.min_instances,
            min(self.max_instances, required_instances)
        )
        
        if required_instances != self.current_instances:
            await self.scale(required_instances)
    
    def calculate_required_instances(self, load, target_utilization) -> int:
        """计算所需实例数"""
        # 单实例处理能力
        capacity_per_instance = 100  # RPS
        
        required = load / (capacity_per_instance * target_utilization)
        
        return int(required) + 1  # 向上取整
    
    async def scale(self, target_instances: int):
        """执行扩缩容"""
        if target_instances > self.current_instances:
            await self.scale_up(target_instances - self.current_instances)
        else:
            await self.scale_down(self.current_instances - target_instances)
        
        self.current_instances = target_instances
        self.scaling_history.append({
            'timestamp': time.time(),
            'target': target_instances,
            'previous': self.current_instances
        })
    
    async def scale_up(self, count: int):
        """扩容"""
        for _ in range(count):
            new_instance = await self.create_instance()
            await self.warm_up_instance(new_instance)
    
    async def scale_down(self, count: int):
        """缩容"""
        instances_to_remove = self.select_instances_to_remove(count)
        for instance in instances_to_remove:
            await self.drain_instance(instance)
            await self.terminate_instance(instance)

性能监控与调优

class ThroughputMonitor:
    def __init__(self):
        self.metrics_history = []
    
    def record_metrics(self, metrics: ThroughputMetrics):
        """记录指标"""
        self.metrics_history.append({
            'timestamp': time.time(),
            'metrics': metrics
        })
    
    def analyze_bottlenecks(self) -> List[str]:
        """分析性能瓶颈"""
        bottlenecks = []
        
        recent = self.metrics_history[-10:]  # 最近10条记录
        avg_metrics = self.average_metrics(recent)
        
        if avg_metrics.gpu_utilization < 0.5:
            bottlenecks.append('GPU利用率低,可能是CPU瓶颈或IO瓶颈')
        
        if avg_metrics.batch_efficiency < 0.7:
            bottlenecks.append('批处理效率低,可能是批大小不合适')
        
        if avg_metrics.requests_per_second < self.target_rps:
            bottlenecks.append('RPS低于目标,需要扩容或优化')
        
        return bottlenecks
    
    def get_optimization_recommendations(self) -> List[dict]:
        """获取优化建议"""
        recommendations = []
        bottlenecks = self.analyze_bottlenecks()
        
        for bottleneck in bottlenecks:
            if 'GPU利用率低' in bottleneck:
                recommendations.append({
                    'action': '增大批大小',
                    'expected_improvement': '20-50%'
                })
            elif '批处理效率低' in bottleneck:
                recommendations.append({
                    'action': '调整批处理参数',
                    'expected_improvement': '10-30%'
                })
        
        return recommendations

最佳实践

  1. 合理批大小:找到最优批大小平衡延迟和吞吐
  2. 异步处理:使用异步IO提升并发能力
  3. 资源隔离:不同优先级请求使用不同资源池
  4. 弹性伸缩:根据负载自动扩缩容
  5. 缓存复用:缓存频繁查询的结果
  6. 监控告警:实时监控吞吐量指标