← 返回首页
🧠

LLM影子部署

📂 llm ⏱ 4 min 668 words

--- title: "LLM影子部署" description: "LLM模型的影子部署策略,在不影响生产环境的前提下验证新模型的真实表现" tags: ["影子部署", "流量镜像", "无风险验证"] category: "llm" icon: "🧠"

LLM影子部署

概述

影子部署(Shadow Deployment)也称为流量镜像(Traffic Mirroring),是一种将生产流量复制到新模型进行并行处理的部署策略。新模型的响应不会返回给用户,仅用于对比分析。这种方式可以在完全零风险的情况下验证新模型在真实流量下的表现。

工作原理

用户请求 → 主模型 → 正常响应给用户
          ↓ (复制)
         影子模型 → 记录响应(不返回给用户)
                      ↓
                  对比分析系统

实现方案

流量镜像

# shadow/mirror.py
import asyncio
import time
from dataclasses import dataclass
from typing import Optional
import aiohttp

@dataclass
class ShadowRequest:
    request_id: str
    prompt: str
    model_params: dict
    timestamp: float

@dataclass  
class ShadowResponse:
    request_id: str
    primary_response: str
    shadow_response: Optional[str]
    primary_latency_ms: float
    shadow_latency_ms: float
    primary_tokens: int
    shadow_tokens: int
    error: Optional[str] = None

class TrafficMirror:
    def __init__(self, primary_endpoint: str, shadow_endpoint: str):
        self.primary_endpoint = primary_endpoint
        self.shadow_endpoint = shadow_endpoint
        self.responses: list[ShadowResponse] = []
    
    async def mirror_request(self, request: ShadowRequest) -> ShadowResponse:
        async with aiohttp.ClientSession() as session:
            primary_task = self._call_primary(session, request)
            shadow_task = self._call_shadow(session, request)
            
            primary_result, shadow_result = await asyncio.gather(
                primary_task, shadow_task,
                return_exceptions=True
            )
            
            response = ShadowResponse(
                request_id=request.request_id,
                primary_response=primary_result.get("response", ""),
                shadow_response=shadow_result.get("response", ""),
                primary_latency_ms=primary_result.get("latency_ms", 0),
                shadow_latency_ms=shadow_result.get("latency_ms", 0),
                primary_tokens=primary_result.get("tokens", 0),
                shadow_tokens=shadow_result.get("tokens", 0),
                error=shadow_result.get("error") if isinstance(shadow_result, dict) else str(shadow_result)
            )
            
            self.responses.append(response)
            return response
    
    async def _call_primary(self, session, request: ShadowRequest) -> dict:
        return await self._call_model(session, self.primary_endpoint, request)
    
    async def _call_shadow(self, session, request: ShadowRequest) -> dict:
        try:
            return await self._call_model(session, self.shadow_endpoint, request)
        except Exception as e:
            return {"error": str(e), "response": "", "latency_ms": 0, "tokens": 0}
    
    async def _call_model(self, session, endpoint: str, 
                         request: ShadowRequest) -> dict:
        start = time.time()
        
        async with session.post(
            f"{endpoint}/v1/chat/completions",
            json={
                "messages": [{"role": "user", "content": request.prompt}],
                "max_tokens": request.model_params.get("max_tokens", 256),
                "temperature": request.model_params.get("temperature", 0.7),
            }
        ) as resp:
            data = await resp.json()
            latency = (time.time() - start) * 1000
            
            return {
                "response": data["choices"][0]["message"]["content"],
                "latency_ms": latency,
                "tokens": data.get("usage", {}).get("total_tokens", 0),
            }

对比分析

# shadow/analysis.py
from dataclasses import dataclass
from typing import Optional

@dataclass
class ComparisonResult:
    total_requests: int
    shadow_success_rate: float
    avg_latency_diff_ms: float
    avg_latency_ratio: float
    avg_token_diff: int
    
    # 质量对比
    preference_counts: dict  # {"primary": n, "shadow": n, "tie": n}
    
    # 异常检测
    error_count: int
    timeout_count: int
    hallucination_diff: float

class ShadowAnalyzer:
    def __init__(self, responses: list[ShadowResponse]):
        self.responses = responses
    
    def analyze(self) -> ComparisonResult:
        valid_responses = [r for r in self.responses if r.shadow_response is not None]
        
        total = len(self.responses)
        success = len(valid_responses)
        
        latency_diffs = []
        latency_ratios = []
        token_diffs = []
        
        for r in valid_responses:
            if r.primary_latency_ms > 0:
                latency_diffs.append(r.shadow_latency_ms - r.primary_latency_ms)
                latency_ratios.append(r.shadow_latency_ms / r.primary_latency_ms)
            
            token_diffs.append(r.shadow_tokens - r.primary_tokens)
        
        error_count = sum(1 for r in self.responses if r.error)
        timeout_count = sum(
            1 for r in self.responses 
            if r.error and "timeout" in str(r.error).lower()
        )
        
        return ComparisonResult(
            total_requests=total,
            shadow_success_rate=success / total if total > 0 else 0,
            avg_latency_diff_ms=sum(latency_diffs) / len(latency_diffs) if latency_diffs else 0,
            avg_latency_ratio=sum(latency_ratios) / len(latency_ratios) if latency_ratios else 1,
            avg_token_diff=sum(token_diffs) / len(token_diffs) if token_diffs else 0,
            preference_counts=self._count_preferences(valid_responses),
            error_count=error_count,
            timeout_count=timeout_count,
            hallucination_diff=0,
        )
    
    def _count_preferences(self, responses: list) -> dict:
        counts = {"primary": 0, "shadow": 0, "tie": 0}
        
        for r in responses:
            # 基于长度和响应时间的简单偏好判断
            shadow_quality = len(r.shadow_response) / max(r.shadow_latency_ms, 1)
            primary_quality = len(r.primary_response) / max(r.primary_latency_ms, 1)
            
            if abs(shadow_quality - primary_quality) < 0.1:
                counts["tie"] += 1
            elif shadow_quality > primary_quality:
                counts["shadow"] += 1
            else:
                counts["primary"] += 1
        
        return counts
    
    def generate_report(self) -> str:
        result = self.analyze()
        
        report = f"""
Shadow Deployment Analysis Report
================================

Overview:
  Total Requests: {result.total_requests}
  Shadow Success Rate: {result.shadow_success_rate:.1%}

Performance:
  Avg Latency Difference: {result.avg_latency_diff_ms:+.1f}ms
  Avg Latency Ratio: {result.avg_latency_ratio:.2f}x
  Avg Token Difference: {result.avg_token_diff:+.1f}

Reliability:
  Errors: {result.error_count}
  Timeouts: {result.timeout_count}

Quality Comparison:
  Primary Preferred: {result.preference_counts['primary']}
  Shadow Preferred: {result.preference_counts['shadow']}
  Tie: {result.preference_counts['tie']}
"""
        
        # 决策建议
        if result.shadow_success_rate < 0.95:
            report += "\n⚠️ Recommendation: Shadow model has high failure rate, investigate before promotion"
        elif result.avg_latency_ratio > 1.5:
            report += "\n⚠️ Recommendation: Shadow model has significantly higher latency"
        elif result.preference_counts['shadow'] > result.preference_counts['primary']:
            report += "\n✅ Recommendation: Shadow model shows improvement, consider A/B testing"
        else:
            report += "\n📊 Recommendation: Continue shadow testing for more data"
        
        return report

自动化影子部署管理

# shadow/manager.py
class ShadowDeploymentManager:
    def __init__(self, mirror: TrafficMirror, analyzer: ShadowAnalyzer):
        self.mirror = mirror
        self.analyzer = analyzer
        self.is_active = False
    
    async def start(self, duration_hours: int = 24):
        self.is_active = True
        print(f"Shadow deployment started, will run for {duration_hours} hours")
        
        end_time = time.time() + duration_hours * 3600
        while self.is_active and time.time() < end_time:
            await asyncio.sleep(3600)  # 每小时检查一次
            self._periodic_analysis()
        
        self._final_report()
    
    def stop(self):
        self.is_active = False
    
    def _periodic_analysis(self):
        if not self.mirror.responses:
            return
        
        result = self.analyzer.analyze()
        print(f"\n[Periodic Report] Success Rate: {result.shadow_success_rate:.1%}, "
              f"Latency Ratio: {result.avg_latency_ratio:.2f}x")
        
        if result.shadow_success_rate < 0.9:
            print("⚠️ Low success rate detected, stopping shadow deployment")
            self.stop()
    
    def _final_report(self):
        report = self.analyzer.generate_report()
        print(report)
        
        # 保存报告
        with open("shadow_report.txt", "w") as f:
            f.write(report)

使用场景

  1. 大版本升级前验证:在发布全新模型架构前进行全面验证
  2. 成本优化测试:测试更小的模型是否能达到相近效果
  3. 延迟优化验证:验证推理优化是否影响响应质量
  4. 多供应商对比:同时对比多个模型供应商的表现

注意事项

  1. 资源开销:影子部署需要双倍推理资源
  2. 延迟影响:确保影子调用不阻塞主路径
  3. 数据安全:影子响应不应用于任何生产决策
  4. 存储成本:大量响应数据需要妥善存储和管理