吞吐量优化:提升LLM系统处理能力的方法
吞吐量指标定义
核心指标
from dataclasses import dataclass
from typing import List
import time
@dataclass
class ThroughputMetrics:
requests_per_second: float # RPS: 每秒请求数
tokens_per_second: float # TPS: 每秒token数
batch_efficiency: float # 批处理效率
gpu_utilization: float # GPU利用率
concurrent_users: float # 并发用户数
@classmethod
def measure(cls, handler, requests: List, duration: float = 60.0) -> 'ThroughputMetrics':
"""测量吞吐量指标"""
start_time = time.time()
completed = 0
total_tokens = 0
while time.time() - start_time < duration:
for req in requests:
result = handler.process(req)
completed += 1
total_tokens += result['tokens_generated']
elapsed = time.time() - start_time
return cls(
requests_per_second=completed / elapsed,
tokens_per_second=total_tokens / elapsed,
batch_efficiency=0.0, # 需要额外测量
gpu_utilization=0.0, # 需要系统监控
concurrent_users=len(requests)
)
批处理优化
动态批处理
import asyncio
from collections import deque
from typing import Optional
class DynamicBatcher:
def __init__(self, max_batch_size: int = 32, max_wait_time: float = 0.1):
self.max_batch_size = max_batch_size
self.max_wait_time = max_wait_time
self.pending_queue = deque()
self.batch = []
self.lock = asyncio.Lock()
async def add_request(self, request) -> asyncio.Future:
"""添加请求到批处理器"""
future = asyncio.get_event_loop().create_future()
async with self.lock:
self.pending_queue.append({
'request': request,
'future': future,
'timestamp': time.time()
})
# 检查是否应该形成批次
if self.should_form_batch():
await self.form_batch()
return future
def should_form_batch(self) -> bool:
"""判断是否应该形成批次"""
if len(self.pending_queue) >= self.max_batch_size:
return True
if self.pending_queue:
oldest = self.pending_queue[0]
if time.time() - oldest['timestamp'] >= self.max_wait_time:
return True
return False
async def form_batch(self):
"""形成批次并处理"""
batch_items = []
while self.pending_queue and len(batch_items) < self.max_batch_size:
item = self.pending_queue.popleft()
batch_items.append(item)
if batch_items:
# 批量处理
inputs = [item['request']['input'] for item in batch_items]
outputs = await self.process_batch(inputs)
# 设置结果
for item, output in zip(batch_items, outputs):
item['future'].set_result(output)
推测解码
class SpeculativeDecoder:
def __init__(self, draft_model, target_model, gamma: int = 5):
self.draft_model = draft_model # 小模型
self.target_model = target_model # 大模型
self.gamma = gamma # 推测token数
async def generate(self, prompt: str) -> str:
"""使用推测解码生成"""
tokens = self.tokenize(prompt)
while not self.is_finished(tokens):
# 1. 用小模型快速生成gamma个token
draft_tokens = []
draft_probs = []
current = tokens
for _ in range(self.gamma):
prob = self.draft_model.get_probs(current)
next_token = self.sample(prob)
draft_tokens.append(next_token)
draft_probs.append(prob[next_token])
current = current + [next_token]
# 2. 用大模型验证
target_probs = self.target_model.get_probs_batch(
tokens, draft_tokens
)
# 3. 接受/拒绝采样
accepted = 0
for i in range(self.gamma):
draft_prob = draft_probs[i]
target_prob = target_probs[i][draft_tokens[i]]
# 接受概率
accept_prob = min(1.0, target_prob / draft_prob)
if random.random() < accept_prob:
tokens.append(draft_tokens[i])
accepted += 1
else:
# 从修正分布采样
corrected = self.correct_distribution(
target_probs[i], draft_probs[i]
)
tokens.append(self.sample(corrected))
break
return self.detokenize(tokens)
并行化策略
张量并行
import torch
import torch.distributed as dist
class TensorParallel:
def __init__(self, model, world_size: int):
self.model = model
self.world_size = world_size
# 初始化进程组
dist.init_process_group(
backend='nccl',
world_size=world_size
)
self.rank = dist.get_rank()
self.local_rank = self.rank % torch.cuda.device_count()
def parallel_forward(self, input_ids):
"""并行前向传播"""
# 分割输入
chunks = torch.chunk(input_ids, self.world_size, dim=1)
local_input = chunks[self.rank].to(self.local_rank)
# 本地计算
local_output = self.model(local_input)
# 收集结果
outputs = [torch.zeros_like(local_output) for _ in range(self.world_size)]
dist.all_gather(outputs, local_output)
return torch.cat(outputs, dim=1)
流水线并行
class PipelineParallel:
def __init__(self, model_layers, num_stages: int):
self.num_stages = num_stages
self.stages = self.distribute_layers(model_layers)
def distribute_layers(self, layers):
"""将层分配到不同阶段"""
layers_per_stage = len(layers) // self.num_stages
stages = []
for i in range(self.num_stages):
start = i * layers_per_stage
end = start + layers_per_stage if i < self.num_stages - 1 else len(layers)
stages.append(layers[start:end])
return stages
async def pipeline_forward(self, input_batches):
"""流水线前向传播"""
# 微批次处理
micro_batches = self.split_into_micro_batches(input_batches)
# 流水线调度
schedule = self.create_schedule(len(micro_batches))
outputs = []
for step in schedule:
stage_outputs = []
for stage_idx, micro_batch_idx in enumerate(step):
if micro_batch_idx is not None:
output = await self.process_micro_batch(
stage_idx,
micro_batches[micro_batch_idx]
)
stage_outputs.append(output)
outputs.extend(stage_outputs)
return self.merge_outputs(outputs)
资源调度
智能负载均衡
class LLMRouter:
def __init__(self, instances: List):
self.instances = instances
self.health_checker = HealthChecker()
self.metrics_collector = MetricsCollector()
async def route_request(self, request) -> dict:
"""路由请求到最佳实例"""
# 获取实例状态
available_instances = await self.get_healthy_instances()
if not available_instances:
raise NoAvailableInstanceError()
# 计算每个实例的分数
scores = {}
for instance in available_instances:
score = await self.calculate_instance_score(instance, request)
scores[instance['id']] = score
# 选择最佳实例
best_instance_id = max(scores, key=scores.get)
return await self.send_to_instance(best_instance_id, request)
async def calculate_instance_score(self, instance, request) -> float:
"""计算实例分数"""
metrics = await self.metrics_collector.get_metrics(instance['id'])
# 综合考虑多个因素
score = (
(1 - metrics['gpu_utilization']) * 0.3 + # GPU空闲度
(1 / (metrics['queue_length'] + 1)) * 0.3 + # 队列长度
(1 / (metrics['avg_latency'] + 0.1)) * 0.2 + # 延迟
metrics['availability'] * 0.2 # 可用性
)
return score
async def get_healthy_instances(self) -> List:
"""获取健康实例"""
healthy = []
for instance in self.instances:
if await self.health_checker.is_healthy(instance):
healthy.append(instance)
return healthy
自动扩缩容
class AutoScaler:
def __init__(self, min_instances: int = 2, max_instances: int = 10):
self.min_instances = min_instances
self.max_instances = max_instances
self.current_instances = min_instances
self.scaling_history = []
async def evaluate_scaling(self, metrics: dict):
"""评估是否需要扩缩容"""
current_load = metrics['requests_per_second']
target_utilization = 0.7 # 目标利用率
# 计算所需实例数
required_instances = self.calculate_required_instances(
current_load, target_utilization
)
# 限制范围
required_instances = max(
self.min_instances,
min(self.max_instances, required_instances)
)
if required_instances != self.current_instances:
await self.scale(required_instances)
def calculate_required_instances(self, load, target_utilization) -> int:
"""计算所需实例数"""
# 单实例处理能力
capacity_per_instance = 100 # RPS
required = load / (capacity_per_instance * target_utilization)
return int(required) + 1 # 向上取整
async def scale(self, target_instances: int):
"""执行扩缩容"""
if target_instances > self.current_instances:
await self.scale_up(target_instances - self.current_instances)
else:
await self.scale_down(self.current_instances - target_instances)
self.current_instances = target_instances
self.scaling_history.append({
'timestamp': time.time(),
'target': target_instances,
'previous': self.current_instances
})
async def scale_up(self, count: int):
"""扩容"""
for _ in range(count):
new_instance = await self.create_instance()
await self.warm_up_instance(new_instance)
async def scale_down(self, count: int):
"""缩容"""
instances_to_remove = self.select_instances_to_remove(count)
for instance in instances_to_remove:
await self.drain_instance(instance)
await self.terminate_instance(instance)
性能监控与调优
class ThroughputMonitor:
def __init__(self):
self.metrics_history = []
def record_metrics(self, metrics: ThroughputMetrics):
"""记录指标"""
self.metrics_history.append({
'timestamp': time.time(),
'metrics': metrics
})
def analyze_bottlenecks(self) -> List[str]:
"""分析性能瓶颈"""
bottlenecks = []
recent = self.metrics_history[-10:] # 最近10条记录
avg_metrics = self.average_metrics(recent)
if avg_metrics.gpu_utilization < 0.5:
bottlenecks.append('GPU利用率低,可能是CPU瓶颈或IO瓶颈')
if avg_metrics.batch_efficiency < 0.7:
bottlenecks.append('批处理效率低,可能是批大小不合适')
if avg_metrics.requests_per_second < self.target_rps:
bottlenecks.append('RPS低于目标,需要扩容或优化')
return bottlenecks
def get_optimization_recommendations(self) -> List[dict]:
"""获取优化建议"""
recommendations = []
bottlenecks = self.analyze_bottlenecks()
for bottleneck in bottlenecks:
if 'GPU利用率低' in bottleneck:
recommendations.append({
'action': '增大批大小',
'expected_improvement': '20-50%'
})
elif '批处理效率低' in bottleneck:
recommendations.append({
'action': '调整批处理参数',
'expected_improvement': '10-30%'
})
return recommendations
最佳实践
- 合理批大小:找到最优批大小平衡延迟和吞吐
- 异步处理:使用异步IO提升并发能力
- 资源隔离:不同优先级请求使用不同资源池
- 弹性伸缩:根据负载自动扩缩容
- 缓存复用:缓存频繁查询的结果
- 监控告警:实时监控吞吐量指标