← 返回首页
🧠

延迟优化:降低LLM推理延迟的技术和策略

📂 llm ⏱ 4 min 693 words

延迟优化:降低LLM推理延迟的技术和策略

延迟指标定义

关键延迟指标

from dataclasses import dataclass
from typing import Optional
import time

@dataclass
class LatencyMetrics:
    time_to_first_token: float  # TTFT: 首token延迟
    inter_token_latency: float  # ITL: token间延迟
    total_latency: float  # 端到端延迟
    tokens_per_second: float  # 生成速度
    
    @classmethod
    def measure(cls, generator):
        """测量延迟指标"""
        first_token_time = None
        prev_token_time = None
        token_times = []
        
        start_time = time.time()
        
        for token in generator:
            current_time = time.time()
            
            if first_token_time is None:
                first_token_time = current_time
            
            if prev_token_time is not None:
                token_times.append(current_time - prev_token_time)
            
            prev_token_time = current_time
            yield token
        
        end_time = time.time()
        
        ttft = first_token_time - start_time if first_token_time else 0
        itl = sum(token_times) / len(token_times) if token_times else 0
        total = end_time - start_time
        tps = len(token_times) / total if total > 0 else 0
        
        return cls(
            time_to_first_token=ttft,
            inter_token_latency=itl,
            total_latency=total,
            tokens_per_second=tps
        )

模型层面优化

模型量化

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

class ModelQuantizer:
    def __init__(self, model_name: str):
        self.model_name = model_name
    
    def quantize_int8(self):
        """INT8量化"""
        bnb_config = BitsAndBytesConfig(
            load_in_8bit=True,
            llm_int8_threshold=6.0,
            llm_int8_skip_modules=None
        )
        
        model = AutoModelForCausalLM.from_pretrained(
            self.model_name,
            quantization_config=bnb_config,
            device_map="auto"
        )
        
        return model
    
    def quantize_int4(self):
        """INT4量化(GPTQ/AWQ)"""
        from auto_gptq import AutoGPTQForCausalLM
        
        model = AutoGPTQForCausalLM.from_quantized(
            self.model_name,
            device_map="auto",
            use_safetensors=True
        )
        
        return model
    
    def get_model_size(self, model):
        """获取模型大小"""
        param_size = sum(
            p.nelement() * p.element_size() 
            for p in model.parameters()
        )
        buffer_size = sum(
            b.nelement() * b.element_size() 
            for b in model.buffers()
        )
        
        return (param_size + buffer_size) / 1024 / 1024  # MB

KV Cache优化

class KVCacheOptimizer:
    def __init__(self):
        self.cache_config = {
            'max_batch_size': 32,
            'max_sequence_length': 2048,
            'num_layers': 32,
            'num_heads': 32,
            'head_dim': 128
        }
    
    def calculate_cache_size(self) -> int:
        """计算KV Cache大小(字节)"""
        # 每个token的KV Cache大小
        bytes_per_token = (
            2 *  # K和V
            self.cache_config['num_layers'] *
            self.cache_config['num_heads'] *
            self.cache_config['head_dim'] *
            2  # FP16
        )
        
        return bytes_per_token * self.cache_config['max_batch_size'] * self.cache_config['max_sequence_length']
    
    def optimize_memory_allocation(self, batch_size: int, seq_length: int):
        """优化内存分配"""
        allocated = (
            2 *
            self.cache_config['num_layers'] *
            self.cache_config['num_heads'] *
            self.cache_config['head_dim'] *
            2 *
            batch_size *
            seq_length
        )
        
        return {
            'allocated_mb': allocated / 1024 / 1024,
            'utilization': (batch_size * seq_length) / (self.cache_config['max_batch_size'] * self.cache_config['max_sequence_length'])
        }

推理引擎优化

使用vLLM加速

from vllm import LLM, SamplingParams

class VLLMOptimizer:
    def __init__(self, model_name: str):
        self.llm = LLM(
            model=model_name,
            tensor_parallel_size=2,  # 张量并行
            max_num_batched_tokens=8192,
            max_num_seqs=256,
            gpu_memory_utilization=0.9,
            swap_space=4,  # GB
            enforce_eager=False  # 启用CUDA Graph
        )
    
    def generate(self, prompts: list, max_tokens: int = 512):
        """批量生成"""
        sampling_params = SamplingParams(
            temperature=0.7,
            top_p=0.9,
            max_tokens=max_tokens
        )
        
        outputs = self.llm.generate(prompts, sampling_params)
        
        return outputs
    
    def benchmark_throughput(self, prompts: list, num_runs: int = 10):
        """测试吞吐量"""
        import time
        
        latencies = []
        throughputs = []
        
        for _ in range(num_runs):
            start = time.time()
            self.generate(prompts)
            end = time.time()
            
            latency = end - start
            throughput = len(prompts) / latency
            
            latencies.append(latency)
            throughputs.append(throughput)
        
        return {
            'avg_latency': sum(latencies) / len(latencies),
            'avg_throughput': sum(throughputs) / len(throughputs)
        }

连续批处理

class ContinuousBatching:
    def __init__(self, max_batch_size: int = 64):
        self.max_batch_size = max_batch_size
        self.pending_requests = []
        self.active_batch = []
    
    def add_request(self, request):
        """添加请求到待处理队列"""
        self.pending_requests.append(request)
        self.try_expand_batch()
    
    def try_expand_batch(self):
        """尝试扩展批次"""
        while (len(self.active_batch) < self.max_batch_size and 
               self.pending_requests):
            request = self.pending_requests.pop(0)
            self.active_batch.append(request)
    
    def process_batch(self):
        """处理当前批次"""
        if not self.active_batch:
            return []
        
        # 收集所有请求的输入
        inputs = [req['input'] for req in self.active_batch]
        
        # 批量推理
        outputs = self.model.generate_batch(inputs)
        
        # 处理完成的请求
        completed = []
        remaining = []
        
        for req, output in zip(self.active_batch, outputs):
            if output['finished']:
                completed.append({
                    'request': req,
                    'output': output['text']
                })
            else:
                remaining.append(req)
        
        self.active_batch = remaining
        self.try_expand_batch()
        
        return completed

系统层面优化

GPU显存优化

import torch
import gc

class GPUMemoryOptimizer:
    def __init__(self):
        self.torch_cache = {}
    
    def optimize_inference(self):
        """优化推理显存使用"""
        # 启用torch.compile
        torch._dynamo.config.suppress_errors = True
        
        # 清理显存
        gc.collect()
        torch.cuda.empty_cache()
        
        # 设置显存限制
        torch.cuda.set_per_process_memory_fraction(0.9)
    
    def use_flash_attention(self, model):
        """启用Flash Attention"""
        for name, module in model.named_modules():
            if hasattr(module, 'flash_attention'):
                module.flash_attention = True
        
        return model
    
    def enable_tensor_cores(self):
        """启用Tensor Cores"""
        torch.backends.cuda.matmul.allow_tf32 = True
        torch.backends.cudnn.allow_tf32 = True
        
        # 启用CUDNN benchmark
        torch.backends.cudnn.benchmark = True

请求级别的优化

class RequestOptimizer:
    def __init__(self):
        self.cache = {}
    
    def check_cache(self, prompt: str) -> bool:
        """检查缓存"""
        prompt_hash = self.hash_prompt(prompt)
        return prompt_hash in self.cache
    
    def get_from_cache(self, prompt: str):
        """从缓存获取结果"""
        prompt_hash = self.hash_prompt(prompt)
        return self.cache.get(prompt_hash)
    
    def save_to_cache(self, prompt: str, result):
        """保存到缓存"""
        prompt_hash = self.hash_prompt(prompt)
        self.cache[prompt_hash] = {
            'result': result,
            'timestamp': time.time()
        }
    
    def trim_prompt(self, prompt: str, max_tokens: int) -> str:
        """裁剪提示词"""
        tokens = prompt.split()
        if len(tokens) > max_tokens:
            return ' '.join(tokens[:max_tokens]) + '...'
        return prompt
    
    def batch_similar_prompts(self, prompts: list) -> dict:
        """批量处理相似提示词"""
        # 分组相似提示词
        groups = {}
        for prompt in prompts:
            prefix = self.get_prefix(prompt, n=50)
            if prefix not in groups:
                groups[prefix] = []
            groups[prefix].append(prompt)
        
        return groups
    
    def hash_prompt(self, prompt: str) -> str:
        import hashlib
        return hashlib.md5(prompt.encode()).hexdigest()

延迟优化策略表

LATENCY_OPTIMIZATION_STRATEGIES = {
    'model_level': [
        {'name': 'INT4量化', 'ttft_reduction': '30-50%', 'quality_impact': 'low'},
        {'name': '模型蒸馏', 'ttft_reduction': '50-70%', 'quality_impact': 'medium'},
        {'name': '剪枝', 'ttft_reduction': '20-40%', 'quality_impact': 'low'}
    ],
    'engine_level': [
        {'name': 'vLLM', 'throughput_increase': '2-4x', 'complexity': 'medium'},
        {'name': 'TensorRT-LLM', 'throughput_increase': '3-5x', 'complexity': 'high'},
        {'name': '连续批处理', 'throughput_increase': '2-3x', 'complexity': 'medium'}
    ],
    'system_level': [
        {'name': 'Flash Attention', 'memory_reduction': '20-40%', 'speedup': '1.5-2x'},
        {'name': 'KV Cache优化', 'memory_reduction': '30-50%', 'speedup': '1.2-1.5x'},
        {'name': '请求缓存', 'ttft_reduction': '90%+', 'scope': '重复查询'}
    ]
}

最佳实践

  1. 基准测试:优化前先建立性能基线
  2. 渐进优化:从简单的优化开始
  3. 质量验证:确保优化不降低质量
  4. 监控告警:优化后持续监控性能
  5. 成本效益:权衡优化成本和收益
  6. 文档记录:记录所有优化决策