延迟优化:降低LLM推理延迟的技术和策略
延迟指标定义
关键延迟指标
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class LatencyMetrics:
time_to_first_token: float # TTFT: 首token延迟
inter_token_latency: float # ITL: token间延迟
total_latency: float # 端到端延迟
tokens_per_second: float # 生成速度
@classmethod
def measure(cls, generator):
"""测量延迟指标"""
first_token_time = None
prev_token_time = None
token_times = []
start_time = time.time()
for token in generator:
current_time = time.time()
if first_token_time is None:
first_token_time = current_time
if prev_token_time is not None:
token_times.append(current_time - prev_token_time)
prev_token_time = current_time
yield token
end_time = time.time()
ttft = first_token_time - start_time if first_token_time else 0
itl = sum(token_times) / len(token_times) if token_times else 0
total = end_time - start_time
tps = len(token_times) / total if total > 0 else 0
return cls(
time_to_first_token=ttft,
inter_token_latency=itl,
total_latency=total,
tokens_per_second=tps
)
模型层面优化
模型量化
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
class ModelQuantizer:
def __init__(self, model_name: str):
self.model_name = model_name
def quantize_int8(self):
"""INT8量化"""
bnb_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0,
llm_int8_skip_modules=None
)
model = AutoModelForCausalLM.from_pretrained(
self.model_name,
quantization_config=bnb_config,
device_map="auto"
)
return model
def quantize_int4(self):
"""INT4量化(GPTQ/AWQ)"""
from auto_gptq import AutoGPTQForCausalLM
model = AutoGPTQForCausalLM.from_quantized(
self.model_name,
device_map="auto",
use_safetensors=True
)
return model
def get_model_size(self, model):
"""获取模型大小"""
param_size = sum(
p.nelement() * p.element_size()
for p in model.parameters()
)
buffer_size = sum(
b.nelement() * b.element_size()
for b in model.buffers()
)
return (param_size + buffer_size) / 1024 / 1024 # MB
KV Cache优化
class KVCacheOptimizer:
def __init__(self):
self.cache_config = {
'max_batch_size': 32,
'max_sequence_length': 2048,
'num_layers': 32,
'num_heads': 32,
'head_dim': 128
}
def calculate_cache_size(self) -> int:
"""计算KV Cache大小(字节)"""
# 每个token的KV Cache大小
bytes_per_token = (
2 * # K和V
self.cache_config['num_layers'] *
self.cache_config['num_heads'] *
self.cache_config['head_dim'] *
2 # FP16
)
return bytes_per_token * self.cache_config['max_batch_size'] * self.cache_config['max_sequence_length']
def optimize_memory_allocation(self, batch_size: int, seq_length: int):
"""优化内存分配"""
allocated = (
2 *
self.cache_config['num_layers'] *
self.cache_config['num_heads'] *
self.cache_config['head_dim'] *
2 *
batch_size *
seq_length
)
return {
'allocated_mb': allocated / 1024 / 1024,
'utilization': (batch_size * seq_length) / (self.cache_config['max_batch_size'] * self.cache_config['max_sequence_length'])
}
推理引擎优化
使用vLLM加速
from vllm import LLM, SamplingParams
class VLLMOptimizer:
def __init__(self, model_name: str):
self.llm = LLM(
model=model_name,
tensor_parallel_size=2, # 张量并行
max_num_batched_tokens=8192,
max_num_seqs=256,
gpu_memory_utilization=0.9,
swap_space=4, # GB
enforce_eager=False # 启用CUDA Graph
)
def generate(self, prompts: list, max_tokens: int = 512):
"""批量生成"""
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
max_tokens=max_tokens
)
outputs = self.llm.generate(prompts, sampling_params)
return outputs
def benchmark_throughput(self, prompts: list, num_runs: int = 10):
"""测试吞吐量"""
import time
latencies = []
throughputs = []
for _ in range(num_runs):
start = time.time()
self.generate(prompts)
end = time.time()
latency = end - start
throughput = len(prompts) / latency
latencies.append(latency)
throughputs.append(throughput)
return {
'avg_latency': sum(latencies) / len(latencies),
'avg_throughput': sum(throughputs) / len(throughputs)
}
连续批处理
class ContinuousBatching:
def __init__(self, max_batch_size: int = 64):
self.max_batch_size = max_batch_size
self.pending_requests = []
self.active_batch = []
def add_request(self, request):
"""添加请求到待处理队列"""
self.pending_requests.append(request)
self.try_expand_batch()
def try_expand_batch(self):
"""尝试扩展批次"""
while (len(self.active_batch) < self.max_batch_size and
self.pending_requests):
request = self.pending_requests.pop(0)
self.active_batch.append(request)
def process_batch(self):
"""处理当前批次"""
if not self.active_batch:
return []
# 收集所有请求的输入
inputs = [req['input'] for req in self.active_batch]
# 批量推理
outputs = self.model.generate_batch(inputs)
# 处理完成的请求
completed = []
remaining = []
for req, output in zip(self.active_batch, outputs):
if output['finished']:
completed.append({
'request': req,
'output': output['text']
})
else:
remaining.append(req)
self.active_batch = remaining
self.try_expand_batch()
return completed
系统层面优化
GPU显存优化
import torch
import gc
class GPUMemoryOptimizer:
def __init__(self):
self.torch_cache = {}
def optimize_inference(self):
"""优化推理显存使用"""
# 启用torch.compile
torch._dynamo.config.suppress_errors = True
# 清理显存
gc.collect()
torch.cuda.empty_cache()
# 设置显存限制
torch.cuda.set_per_process_memory_fraction(0.9)
def use_flash_attention(self, model):
"""启用Flash Attention"""
for name, module in model.named_modules():
if hasattr(module, 'flash_attention'):
module.flash_attention = True
return model
def enable_tensor_cores(self):
"""启用Tensor Cores"""
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
# 启用CUDNN benchmark
torch.backends.cudnn.benchmark = True
请求级别的优化
class RequestOptimizer:
def __init__(self):
self.cache = {}
def check_cache(self, prompt: str) -> bool:
"""检查缓存"""
prompt_hash = self.hash_prompt(prompt)
return prompt_hash in self.cache
def get_from_cache(self, prompt: str):
"""从缓存获取结果"""
prompt_hash = self.hash_prompt(prompt)
return self.cache.get(prompt_hash)
def save_to_cache(self, prompt: str, result):
"""保存到缓存"""
prompt_hash = self.hash_prompt(prompt)
self.cache[prompt_hash] = {
'result': result,
'timestamp': time.time()
}
def trim_prompt(self, prompt: str, max_tokens: int) -> str:
"""裁剪提示词"""
tokens = prompt.split()
if len(tokens) > max_tokens:
return ' '.join(tokens[:max_tokens]) + '...'
return prompt
def batch_similar_prompts(self, prompts: list) -> dict:
"""批量处理相似提示词"""
# 分组相似提示词
groups = {}
for prompt in prompts:
prefix = self.get_prefix(prompt, n=50)
if prefix not in groups:
groups[prefix] = []
groups[prefix].append(prompt)
return groups
def hash_prompt(self, prompt: str) -> str:
import hashlib
return hashlib.md5(prompt.encode()).hexdigest()
延迟优化策略表
LATENCY_OPTIMIZATION_STRATEGIES = {
'model_level': [
{'name': 'INT4量化', 'ttft_reduction': '30-50%', 'quality_impact': 'low'},
{'name': '模型蒸馏', 'ttft_reduction': '50-70%', 'quality_impact': 'medium'},
{'name': '剪枝', 'ttft_reduction': '20-40%', 'quality_impact': 'low'}
],
'engine_level': [
{'name': 'vLLM', 'throughput_increase': '2-4x', 'complexity': 'medium'},
{'name': 'TensorRT-LLM', 'throughput_increase': '3-5x', 'complexity': 'high'},
{'name': '连续批处理', 'throughput_increase': '2-3x', 'complexity': 'medium'}
],
'system_level': [
{'name': 'Flash Attention', 'memory_reduction': '20-40%', 'speedup': '1.5-2x'},
{'name': 'KV Cache优化', 'memory_reduction': '30-50%', 'speedup': '1.2-1.5x'},
{'name': '请求缓存', 'ttft_reduction': '90%+', 'scope': '重复查询'}
]
}
最佳实践
- 基准测试:优化前先建立性能基线
- 渐进优化:从简单的优化开始
- 质量验证:确保优化不降低质量
- 监控告警:优化后持续监控性能
- 成本效益:权衡优化成本和收益
- 文档记录:记录所有优化决策