← 返回首页
🧠

无服务器LLM

📂 llm ⏱ 3 min 470 words

--- title: "无服务器LLM" description: "探讨Serverless LLM架构,包括函数计算、冷启动优化和按需扩缩策略,实现LLM的弹性部署。" tags: ["Serverless", "函数计算", "冷启动", "按需扩缩"] category: "llm" icon: "🧠"

无服务器LLM

什么是Serverless LLM

Serverless LLM将大模型封装为函数服务,用户按调用次数付费,无需管理GPU服务器。云平台自动处理资源分配、扩缩容和运维,开发者只需关注模型逻辑。

架构设计

典型架构

class ServerlessLLM架构:
    """
    请求流:
    1. 用户请求 → API网关
    2. API网关 → 函数计算平台
    3. 函数计算 → GPU实例池
    4. GPU实例 → 推理结果返回
    """
    
    components = {
        'api_gateway': '请求路由、认证、限流',
        'function_runtime': '函数执行环境',
        'gpu_pool': 'GPU实例池,按需分配',
        'model_cache': '模型缓存,加速冷启动',
        'queue': '异步请求队列',
    }

函数封装LLM

import json
from typing import Dict, Any

class ServerlessLLMHandler:
    """Serverless LLM推理处理器"""
    
    def __init__(self):
        self.model = None
        self.tokenizer = None
    
    def init_model(self, model_name: str):
        """初始化模型(可能在冷启动时执行)"""
        if self.model is None:
            from transformers import AutoModelForCausalLM, AutoTokenizer
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModelForCausalLM.from_pretrained(
                model_name,
                torch_dtype="auto",
                device_map="auto",
            )
    
    def handle_request(self, event: Dict[str, Any]) -> Dict[str, Any]:
        """处理推理请求"""
        # 解析请求
        body = json.loads(event.get('body', '{}'))
        prompt = body.get('prompt', '')
        max_tokens = body.get('max_tokens', 256)
        model_name = body.get('model', 'gpt2')
        
        # 初始化模型
        self.init_model(model_name)
        
        # 执行推理
        import torch
        inputs = self.tokenizer(prompt, return_tensors="pt").to(self.model.device)
        
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=max_tokens,
                do_sample=True,
                temperature=0.7,
            )
        
        result = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        
        return {
            'statusCode': 200,
            'body': json.dumps({
                'text': result,
                'model': model_name,
                'usage': {
                    'prompt_tokens': len(inputs['input_ids'][0]),
                    'completion_tokens': len(outputs[0]) - len(inputs['input_ids'][0]),
                }
            })
        }

冷启动优化

模型预加载

class ModelPreloader:
    """模型预加载策略"""
    
    def __init__(self):
        self.loaded_models = {}
        self.warm_models = set()
    
    def preload(self, model_name: str):
        """预加载模型到内存"""
        if model_name not in self.loaded_models:
            from transformers import AutoModelForCausalLM
            self.loaded_models[model_name] = AutoModelForCausalLM.from_pretrained(
                model_name,
                torch_dtype="auto",
                device_map="auto",
            )
            self.warm_models.add(model_name)
            print(f"预加载模型: {model_name}")
    
    def warm_up(self, model_name: str):
        """模型热身,预热CUDA kernel"""
        if model_name in self.loaded_models:
            import torch
            model = self.loaded_models[model_name]
            dummy_input = torch.zeros(1, 10, dtype=torch.long, device=model.device)
            with torch.no_grad():
                model(dummy_input)
            print(f"模型热身完成: {model_name}")

快速恢复技术

class CheckpointManager:
    """模型检查点管理"""
    
    def __init__(self, checkpoint_dir: str):
        self.checkpoint_dir = checkpoint_dir
    
    def save_checkpoint(self, model, step: int):
        """保存模型检查点"""
        import torch
        import os
        path = os.path.join(self.checkpoint_dir, f"checkpoint-{step}.pt")
        torch.save({
            'model_state_dict': model.state_dict(),
            'step': step,
        }, path)
        print(f"检查点已保存: {path}")
    
    def load_checkpoint(self, model, step: int):
        """快速加载检查点"""
        import torch
        import os
        path = os.path.join(self.checkpoint_dir, f"checkpoint-{step}.pt")
        checkpoint = torch.load(path, map_location='cpu')
        model.load_state_dict(checkpoint['model_state_dict'])
        return model

按需扩缩

自动扩缩策略

class AutoScaler:
    """Serverless自动扩缩"""
    
    def __init__(self, min_instances=0, max_instances=20):
        self.min_instances = min_instances
        self.max_instances = max_instances
        self.current_instances = 0
        self.pending_requests = 0
    
    def evaluate(self, metrics: dict) -> int:
        """根据指标决定扩缩"""
        queue_depth = metrics['queue_depth']
        avg_latency = metrics['avg_latency_ms']
        cold_start_rate = metrics['cold_start_rate']
        
        # 扩容条件
        if (queue_depth > 50 or avg_latency > 200 or cold_start_rate > 0.3):
            target = min(self.current_instances + 2, self.max_instances)
        # 缩容条件
        elif queue_depth < 5 and self.current_instances > self.min_instances:
            target = max(self.current_instances - 1, self.min_instances)
        else:
            target = self.current_instances
        
        self.current_instances = target
        return target

资源分配

class ResourceAllocator:
    """动态GPU资源分配"""
    
    def allocate(self, request) -> dict:
        """根据请求大小分配GPU"""
        model_size = request.get('model_size_gb', 7)
        
        if model_size <= 7:
            return {'gpu_type': 'T4', 'gpu_count': 1}
        elif model_size <= 30:
            return {'gpu_type': 'A10G', 'gpu_count': 1}
        else:
            return {'gpu_type': 'A100', 'gpu_count': 2}

成本模型

class CostCalculator:
    """Serverless LLM成本计算器"""
    
    PRICING = {
        'T4': {'per_hour': 0.5, 'per_request': 0.0001},
        'A10G': {'per_hour': 1.0, 'per_request': 0.0002},
        'A100': {'per_hour': 3.0, 'per_request': 0.0005},
    }
    
    def calculate(self, gpu_type: str, requests_per_day: int, 
                  avg_duration_sec: float) -> dict:
        pricing = self.PRICING[gpu_type]
        
        # 按请求计费
        request_cost = requests_per_day * pricing['per_request']
        
        # 按GPU时间计费(冷启动场景)
        gpu_hours = (requests_per_day * avg_duration_sec) / 3600
        compute_cost = gpu_hours * pricing['per_hour']
        
        return {
            'daily_request_cost': request_cost,
            'daily_compute_cost': compute_cost,
            'total_daily_cost': request_cost + compute_cost,
        }

最佳实践