LLM影子部署
--- title: "LLM影子部署" description: "LLM模型的影子部署策略,在不影响生产环境的前提下验证新模型的真实表现" tags: ["影子部署", "流量镜像", "无风险验证"] category: "llm" icon: "🧠"
LLM影子部署
概述
影子部署(Shadow Deployment)也称为流量镜像(Traffic Mirroring),是一种将生产流量复制到新模型进行并行处理的部署策略。新模型的响应不会返回给用户,仅用于对比分析。这种方式可以在完全零风险的情况下验证新模型在真实流量下的表现。
工作原理
用户请求 → 主模型 → 正常响应给用户
↓ (复制)
影子模型 → 记录响应(不返回给用户)
↓
对比分析系统
实现方案
流量镜像
# shadow/mirror.py
import asyncio
import time
from dataclasses import dataclass
from typing import Optional
import aiohttp
@dataclass
class ShadowRequest:
request_id: str
prompt: str
model_params: dict
timestamp: float
@dataclass
class ShadowResponse:
request_id: str
primary_response: str
shadow_response: Optional[str]
primary_latency_ms: float
shadow_latency_ms: float
primary_tokens: int
shadow_tokens: int
error: Optional[str] = None
class TrafficMirror:
def __init__(self, primary_endpoint: str, shadow_endpoint: str):
self.primary_endpoint = primary_endpoint
self.shadow_endpoint = shadow_endpoint
self.responses: list[ShadowResponse] = []
async def mirror_request(self, request: ShadowRequest) -> ShadowResponse:
async with aiohttp.ClientSession() as session:
primary_task = self._call_primary(session, request)
shadow_task = self._call_shadow(session, request)
primary_result, shadow_result = await asyncio.gather(
primary_task, shadow_task,
return_exceptions=True
)
response = ShadowResponse(
request_id=request.request_id,
primary_response=primary_result.get("response", ""),
shadow_response=shadow_result.get("response", ""),
primary_latency_ms=primary_result.get("latency_ms", 0),
shadow_latency_ms=shadow_result.get("latency_ms", 0),
primary_tokens=primary_result.get("tokens", 0),
shadow_tokens=shadow_result.get("tokens", 0),
error=shadow_result.get("error") if isinstance(shadow_result, dict) else str(shadow_result)
)
self.responses.append(response)
return response
async def _call_primary(self, session, request: ShadowRequest) -> dict:
return await self._call_model(session, self.primary_endpoint, request)
async def _call_shadow(self, session, request: ShadowRequest) -> dict:
try:
return await self._call_model(session, self.shadow_endpoint, request)
except Exception as e:
return {"error": str(e), "response": "", "latency_ms": 0, "tokens": 0}
async def _call_model(self, session, endpoint: str,
request: ShadowRequest) -> dict:
start = time.time()
async with session.post(
f"{endpoint}/v1/chat/completions",
json={
"messages": [{"role": "user", "content": request.prompt}],
"max_tokens": request.model_params.get("max_tokens", 256),
"temperature": request.model_params.get("temperature", 0.7),
}
) as resp:
data = await resp.json()
latency = (time.time() - start) * 1000
return {
"response": data["choices"][0]["message"]["content"],
"latency_ms": latency,
"tokens": data.get("usage", {}).get("total_tokens", 0),
}
对比分析
# shadow/analysis.py
from dataclasses import dataclass
from typing import Optional
@dataclass
class ComparisonResult:
total_requests: int
shadow_success_rate: float
avg_latency_diff_ms: float
avg_latency_ratio: float
avg_token_diff: int
# 质量对比
preference_counts: dict # {"primary": n, "shadow": n, "tie": n}
# 异常检测
error_count: int
timeout_count: int
hallucination_diff: float
class ShadowAnalyzer:
def __init__(self, responses: list[ShadowResponse]):
self.responses = responses
def analyze(self) -> ComparisonResult:
valid_responses = [r for r in self.responses if r.shadow_response is not None]
total = len(self.responses)
success = len(valid_responses)
latency_diffs = []
latency_ratios = []
token_diffs = []
for r in valid_responses:
if r.primary_latency_ms > 0:
latency_diffs.append(r.shadow_latency_ms - r.primary_latency_ms)
latency_ratios.append(r.shadow_latency_ms / r.primary_latency_ms)
token_diffs.append(r.shadow_tokens - r.primary_tokens)
error_count = sum(1 for r in self.responses if r.error)
timeout_count = sum(
1 for r in self.responses
if r.error and "timeout" in str(r.error).lower()
)
return ComparisonResult(
total_requests=total,
shadow_success_rate=success / total if total > 0 else 0,
avg_latency_diff_ms=sum(latency_diffs) / len(latency_diffs) if latency_diffs else 0,
avg_latency_ratio=sum(latency_ratios) / len(latency_ratios) if latency_ratios else 1,
avg_token_diff=sum(token_diffs) / len(token_diffs) if token_diffs else 0,
preference_counts=self._count_preferences(valid_responses),
error_count=error_count,
timeout_count=timeout_count,
hallucination_diff=0,
)
def _count_preferences(self, responses: list) -> dict:
counts = {"primary": 0, "shadow": 0, "tie": 0}
for r in responses:
# 基于长度和响应时间的简单偏好判断
shadow_quality = len(r.shadow_response) / max(r.shadow_latency_ms, 1)
primary_quality = len(r.primary_response) / max(r.primary_latency_ms, 1)
if abs(shadow_quality - primary_quality) < 0.1:
counts["tie"] += 1
elif shadow_quality > primary_quality:
counts["shadow"] += 1
else:
counts["primary"] += 1
return counts
def generate_report(self) -> str:
result = self.analyze()
report = f"""
Shadow Deployment Analysis Report
================================
Overview:
Total Requests: {result.total_requests}
Shadow Success Rate: {result.shadow_success_rate:.1%}
Performance:
Avg Latency Difference: {result.avg_latency_diff_ms:+.1f}ms
Avg Latency Ratio: {result.avg_latency_ratio:.2f}x
Avg Token Difference: {result.avg_token_diff:+.1f}
Reliability:
Errors: {result.error_count}
Timeouts: {result.timeout_count}
Quality Comparison:
Primary Preferred: {result.preference_counts['primary']}
Shadow Preferred: {result.preference_counts['shadow']}
Tie: {result.preference_counts['tie']}
"""
# 决策建议
if result.shadow_success_rate < 0.95:
report += "\n⚠️ Recommendation: Shadow model has high failure rate, investigate before promotion"
elif result.avg_latency_ratio > 1.5:
report += "\n⚠️ Recommendation: Shadow model has significantly higher latency"
elif result.preference_counts['shadow'] > result.preference_counts['primary']:
report += "\n✅ Recommendation: Shadow model shows improvement, consider A/B testing"
else:
report += "\n📊 Recommendation: Continue shadow testing for more data"
return report
自动化影子部署管理
# shadow/manager.py
class ShadowDeploymentManager:
def __init__(self, mirror: TrafficMirror, analyzer: ShadowAnalyzer):
self.mirror = mirror
self.analyzer = analyzer
self.is_active = False
async def start(self, duration_hours: int = 24):
self.is_active = True
print(f"Shadow deployment started, will run for {duration_hours} hours")
end_time = time.time() + duration_hours * 3600
while self.is_active and time.time() < end_time:
await asyncio.sleep(3600) # 每小时检查一次
self._periodic_analysis()
self._final_report()
def stop(self):
self.is_active = False
def _periodic_analysis(self):
if not self.mirror.responses:
return
result = self.analyzer.analyze()
print(f"\n[Periodic Report] Success Rate: {result.shadow_success_rate:.1%}, "
f"Latency Ratio: {result.avg_latency_ratio:.2f}x")
if result.shadow_success_rate < 0.9:
print("⚠️ Low success rate detected, stopping shadow deployment")
self.stop()
def _final_report(self):
report = self.analyzer.generate_report()
print(report)
# 保存报告
with open("shadow_report.txt", "w") as f:
f.write(report)
使用场景
- 大版本升级前验证:在发布全新模型架构前进行全面验证
- 成本优化测试:测试更小的模型是否能达到相近效果
- 延迟优化验证:验证推理优化是否影响响应质量
- 多供应商对比:同时对比多个模型供应商的表现
注意事项
- 资源开销:影子部署需要双倍推理资源
- 延迟影响:确保影子调用不阻塞主路径
- 数据安全:影子响应不应用于任何生产决策
- 存储成本:大量响应数据需要妥善存储和管理