← 返回首页
🧠

LLM部署指南:从开发到生产环境的完整部署流程

📂 llm ⏱ 4 min 770 words

LLM部署指南:从开发到生产环境的完整部署流程

部署概述

大语言模型的部署是一个复杂的工程任务,涉及模型优化、服务架构设计、性能调优和监控等多个方面。成功的部署需要在延迟、吞吐量、成本和质量之间找到平衡。

模型优化

模型量化

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

class ModelQuantizer:
    """模型量化工具"""
    
    def __init__(self, model_name: str):
        self.model = AutoModelForCausalLM.from_pretrained(model_name)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    def dynamic_quantization(self, dtype=torch.qint8):
        """动态量化"""
        quantized_model = torch.quantization.quantize_dynamic(
            self.model,
            {torch.nn.Linear},
            dtype=dtype
        )
        return quantized_model
    
    def static_quantization(self, calibration_data, dtype=torch.qint8):
        """静态量化(需要校准数据)"""
        self.model.eval()
        
        # 准备量化配置
        model_quantized = torch.quantization.quantize_dynamic(
            self.model,
            {torch.nn.Linear},
            dtype=dtype
        )
        
        return model_quantized
    
    def export_onnx(self, output_path: str, opset_version: int = 14):
        """导出ONNX格式"""
        self.model.eval()
        
        # 创建虚拟输入
        dummy_input = self.tokenizer(
            "Hello world",
            return_tensors="pt"
        ).input_ids
        
        torch.onnx.export(
            self.model,
            dummy_input,
            output_path,
            input_names=['input_ids'],
            output_names=['logits'],
            dynamic_axes={
                'input_ids': {0: 'batch_size', 1: 'sequence'},
                'logits': {0: 'batch_size', 1: 'sequence'}
            },
            opset_version=opset_version
        )
        
        print(f"Model exported to {output_path}")
    
    def save_optimized(self, output_dir: str):
        """保存优化后的模型"""
        self.model.save_pretrained(output_dir)
        self.tokenizer.save_pretrained(output_dir)

模型剪枝

import torch.nn.utils.prune as prune

class ModelPruner:
    """模型剪枝工具"""
    
    def __init__(self, model):
        self.model = model
    
    def unstructured_pruning(self, amount: float = 0.3):
        """非结构化剪枝"""
        for name, module in self.model.named_modules():
            if isinstance(module, torch.nn.Linear):
                prune.l1_unstructured(module, name='weight', amount=amount)
                prune.remove(module, 'weight')
        
        return self.model
    
    def structured_pruning(self, amount: float = 0.3):
        """结构化剪枝"""
        for name, module in self.model.named_modules():
            if isinstance(module, torch.nn.Linear):
                prune.ln_structured(
                    module, 
                    name='weight', 
                    amount=amount, 
                    n=2, 
                    dim=0
                )
                prune.remove(module, 'weight')
        
        return self.model
    
    def get_model_sparsity(self):
        """获取模型稀疏度"""
        total_params = 0
        zero_params = 0
        
        for param in self.model.parameters():
            total_params += param.numel()
            zero_params += (param == 0).sum().item()
        
        return zero_params / total_params

服务架构设计

FastAPI服务

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
import asyncio
from concurrent.futures import ThreadPoolExecutor

app = FastAPI(title="LLM API Service")

class GenerateRequest(BaseModel):
    prompt: str
    max_length: int = 100
    temperature: float = 0.7
    top_k: int = 50
    top_p: float = 0.95
    num_return_sequences: int = 1

class GenerateResponse(BaseModel):
    generated_text: List[str]
    usage: dict

class LLMService:
    """LLM服务类"""
    
    def __init__(self, model_name: str, device: str = "cuda"):
        self.device = torch.device(device if torch.cuda.is_available() else "cpu")
        self.model = AutoModelForCausalLM.from_pretrained(model_name).to(self.device)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model.eval()
        
        self.executor = ThreadPoolExecutor(max_workers=4)
    
    async def generate(self, request: GenerateRequest) -> GenerateResponse:
        """异步生成文本"""
        loop = asyncio.get_event_loop()
        
        # 在线程池中运行推理
        result = await loop.run_in_executor(
            self.executor,
            self._generate_sync,
            request
        )
        
        return result
    
    def _generate_sync(self, request: GenerateRequest) -> GenerateResponse:
        """同步生成文本"""
        inputs = self.tokenizer(
            request.prompt,
            return_tensors="pt"
        ).to(self.device)
        
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_length=request.max_length,
                temperature=request.temperature,
                top_k=request.top_k,
                top_p=request.top_p,
                num_return_sequences=request.num_return_sequences,
                do_sample=True,
                pad_token_id=self.tokenizer.pad_token_id
            )
        
        generated_texts = [
            self.tokenizer.decode(output, skip_special_tokens=True)
            for output in outputs
        ]
        
        # 计算使用量
        usage = {
            "prompt_tokens": inputs['input_ids'].shape[1],
            "completion_tokens": sum(o.shape[0] for o in outputs) - inputs['input_ids'].shape[1],
            "total_tokens": sum(o.shape[0] for o in outputs)
        }
        
        return GenerateResponse(
            generated_text=generated_texts,
            usage=usage
        )

# 初始化服务
llm_service = None

@app.on_event("startup")
async def startup_event():
    global llm_service
    llm_service = LLMService(
        model_name="your-model-name",
        device="cuda"
    )

@app.post("/v1/generate", response_model=GenerateResponse)
async def generate_text(request: GenerateRequest):
    """生成文本API"""
    try:
        response = await llm_service.generate(request)
        return response
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/v1/models")
async def list_models():
    """列出可用模型"""
    return {
        "data": [
            {
                "id": "your-model",
                "object": "model",
                "created": 1234567890,
                "owned_by": "organization"
            }
        ]
    }

负载均衡

from fastapi import FastAPI
from pydantic import BaseModel
import asyncio
from typing import List
import random

class LoadBalancer:
    """负载均衡器"""
    
    def __init__(self):
        self.instances: List[LLMService] = []
        self.current_index = 0
    
    def add_instance(self, service: LLMService):
        """添加服务实例"""
        self.instances.append(service)
    
    def get_next_instance(self) -> LLMService:
        """获取下一个实例(轮询)"""
        instance = self.instances[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.instances)
        return instance
    
    def get_least_loaded_instance(self) -> LLMService:
        """获取负载最低的实例"""
        # 简化实现:随机选择
        return random.choice(self.instances)

class DistributedLLMService:
    """分布式LLM服务"""
    
    def __init__(self):
        self.load_balancer = LoadBalancer()
    
    async def generate(self, request: GenerateRequest) -> GenerateResponse:
        """分布式生成"""
        instance = self.load_balancer.get_next_instance()
        return await instance.generate(request)

性能优化

批处理优化

class BatchProcessor:
    """批处理器"""
    
    def __init__(self, model, tokenizer, max_batch_size: int = 32):
        self.model = model
        self.tokenizer = tokenizer
        self.max_batch_size = max_batch_size
        self.request_queue = asyncio.Queue()
        self.batch_timeout = 0.1  # 100ms
    
    async def process_request(self, request: GenerateRequest):
        """处理单个请求"""
        future = asyncio.Future()
        await self.request_queue.put((request, future))
        return await future
    
    async def batch_loop(self):
        """批处理循环"""
        while True:
            batch = []
            try:
                # 等待第一个请求
                first_request, first_future = await asyncio.wait_for(
                    self.request_queue.get(),
                    timeout=self.batch_timeout
                )
                batch.append((first_request, first_future))
                
                # 尝试收集更多请求
                try:
                    while len(batch) < self.max_batch_size:
                        request, future = await asyncio.wait_for(
                            self.request_queue.get(),
                            timeout=self.batch_timeout / 10
                        )
                        batch.append((request, future))
                except asyncio.TimeoutError:
                    pass
                
                # 处理批次
                if batch:
                    await self._process_batch(batch)
                    
            except asyncio.TimeoutError:
                continue
    
    async def _process_batch(self, batch: list):
        """处理一个批次"""
        requests = [item[0] for item in batch]
        futures = [item[1] for item in batch]
        
        # 批量编码
        prompts = [req.prompt for req in requests]
        inputs = self.tokenizer(
            prompts,
            return_tensors="pt",
            padding=True,
            truncation=True
        ).to(self.model.device)
        
        # 批量推理
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_length=max(req.max_length for req in requests),
                do_sample=True,
                temperature=0.7
            )
        
        # 分发结果
        for i, future in enumerate(futures):
            generated_text = self.tokenizer.decode(
                outputs[i],
                skip_special_tokens=True
            )
            future.set_result(GenerateResponse(
                generated_text=[generated_text],
                usage={"tokens": outputs[i].shape[0]}
            ))

监控与日志

import logging
from datetime import datetime
from prometheus_client import Counter, Histogram, start_http_server

# Prometheus指标
REQUEST_COUNT = Counter('llm_requests_total', 'Total requests', ['endpoint', 'status'])
REQUEST_LATENCY = Histogram('llm_request_latency_seconds', 'Request latency', ['endpoint'])

class MonitoringMiddleware:
    """监控中间件"""
    
    def __init__(self):
        self.logger = logging.getLogger("llm-service")
        start_http_server(8000)  # Prometheus指标端口
    
    def log_request(self, endpoint: str, duration: float, status: str):
        """记录请求"""
        REQUEST_COUNT.labels(endpoint=endpoint, status=status).inc()
        REQUEST_LATENCY.labels(endpoint=endpoint).observe(duration)
        
        self.logger.info(
            f"Request: {endpoint}, Duration: {duration:.3f}s, Status: {status}"
        )
    
    def log_error(self, error: Exception, context: dict):
        """记录错误"""
        self.logger.error(
            f"Error: {str(error)}, Context: {context}",
            exc_info=True
        )

生产环境检查清单

def production_checklist():
    """生产环境检查清单"""
    checklist = {
        "模型优化": [
            "模型已量化",
            "模型已剪枝",
            "ONNX导出完成",
        ],
        "服务配置": [
            "负载均衡配置",
            "自动扩缩容配置",
            "健康检查端点",
        ],
        "监控": [
            "Prometheus指标",
            "日志收集",
            "告警配置",
        ],
        "安全": [
            "API密钥管理",
            "请求限流",
            "输入验证",
        ],
        "运维": [
            "备份策略",
            "回滚计划",
            "文档完成",
        ]
    }
    
    print("=== 生产环境检查清单 ===")
    for category, items in checklist.items():
        print(f"\n{category}:")
        for item in items:
            print(f"  [ ] {item}")
    
    return checklist

LLM部署是一个系统工程,需要综合考虑性能、可靠性、安全性和可维护性。通过遵循这些最佳实践,可以确保模型在生产环境中稳定高效地运行。