LLM金丝雀发布
--- title: "LLM金丝雀发布" description: "LLM模型的金丝雀发布策略,实现模型的渐进式上线和实时监控,确保生产环境稳定性" tags: ["金丝雀发布", "渐进部署", "生产监控"] category: "llm" icon: "🧠"
LLM金丝雀发布
概述
金丝雀发布(Canary Deployment)是一种渐进式部署策略,先将新模型部署到小部分流量上,观察关键指标后再逐步扩大部署范围。这种策略对LLM项目尤为重要,因为模型行为的不确定性可能导致意想不到的生产问题。
部署架构
金丝雀发布的核心是流量路由。通过负载均衡器或API网关,将一部分用户请求路由到新模型,其余请求继续由旧模型处理。
用户请求 → 负载均衡器 → 旧模型 (90%流量)
→ 新模型 (10%流量)
↓
监控系统 → 自动决策
实现方案
流量路由
# canary/router.py
import hashlib
import time
from dataclasses import dataclass
from typing import Optional
@dataclass
class DeploymentTarget:
model_name: str
endpoint: str
weight: float
active: bool = True
class CanaryRouter:
def __init__(self):
self.targets: list[DeploymentTarget] = []
self.canary_weight: float = 0.1 # 金丝雀初始权重
def add_target(self, target: DeploymentTarget):
self.targets.append(target)
def set_canary_weight(self, weight: float):
self.canary_weight = max(0.0, min(1.0, weight))
self._update_weights()
def _update_weights(self):
if len(self.targets) < 2:
return
# 第一个为稳定版本,第二个为金丝雀版本
self.targets[0].weight = 1.0 - self.canary_weight
self.targets[1].weight = self.canary_weight
def route(self, request_id: str) -> DeploymentTarget:
if len(self.targets) == 0:
raise ValueError("No deployment targets configured")
if len(self.targets) == 1:
return self.targets[0]
hash_val = int(hashlib.md5(request_id.encode()).hexdigest()[:8], 16)
threshold = int(self.targets[0].weight * 1000)
if (hash_val % 1000) < threshold:
return self.targets[0]
else:
return self.targets[1]
监控与自动决策
# canary/monitor.py
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional
@dataclass
class CanaryMetrics:
request_count: int = 0
error_count: int = 0
total_latency_ms: float = 0
latency_count: int = 0
# 质量指标
ratings: list = field(default_factory=list)
hallucination_count: int = 0
# 成本指标
total_tokens: int = 0
class CanaryMonitor:
def __init__(self, stable_metrics: CanaryMetrics,
canary_metrics: CanaryMetrics):
self.stable = stable_metrics
self.canary = canary_metrics
self.start_time = datetime.now()
def check_canary_health(self,
max_error_rate: float = 0.01,
max_latency_increase: float = 1.5,
max_latency_ms: float = 2000) -> dict:
stable_error_rate = (
self.stable.error_count / self.stable.request_count
if self.stable.request_count > 0 else 0
)
canary_error_rate = (
self.canary.error_count / self.canary.request_count
if self.canary.request_count > 0 else 0
)
stable_avg_latency = (
self.stable.total_latency_ms / self.stable.latency_count
if self.stable.latency_count > 0 else 0
)
canary_avg_latency = (
self.canary.total_latency_ms / self.canary.latency_count
if self.canary.latency_count > 0 else 0
)
checks = {
"error_rate": {
"stable": stable_error_rate,
"canary": canary_error_rate,
"healthy": canary_error_rate <= max_error_rate,
"within_tolerance": (
canary_error_rate <= stable_error_rate * max_latency_increase
or stable_error_rate == 0
),
},
"latency": {
"stable_ms": stable_avg_latency,
"canary_ms": canary_avg_latency,
"healthy": canary_avg_latency <= max_latency_ms,
"within_tolerance": (
canary_avg_latency <= stable_avg_latency * max_latency_increase
or stable_avg_latency == 0
),
},
}
all_healthy = all(
check["healthy"] and check["within_tolerance"]
for check in checks.values()
)
return {
"healthy": all_healthy,
"checks": checks,
"recommendation": "continue" if all_healthy else "rollback",
}
自动化金丝雀发布
# canary/automated_release.py
import time
from enum import Enum
class CanaryPhase(Enum):
INIT = "init" # 5%流量
EARLY = "early" # 20%流量
PROGRESS = "progress" # 50%流量
FULL = "full" # 100%流量
class AutomatedCanaryRelease:
def __init__(self, router: CanaryRouter, monitor: CanaryMonitor):
self.router = router
self.monitor = monitor
self.phase = CanaryPhase.INIT
self.phase_start = time.time()
self.phase_configs = {
CanaryPhase.INIT: {"weight": 0.05, "duration_min": 10},
CanaryPhase.EARLY: {"weight": 0.20, "duration_min": 30},
CanaryPhase.PROGRESS: {"weight": 0.50, "duration_min": 60},
CanaryPhase.FULL: {"weight": 1.0, "duration_min": 0},
}
def run_cycle(self):
config = self.phase_configs[self.phase]
elapsed_min = (time.time() - self.phase_start) / 60
health = self.monitor.check_canary_health()
if health["recommendation"] == "rollback":
print("⚠️ Canary unhealthy, triggering rollback!")
self._rollback()
return False
if elapsed_min >= config["duration_min"] and self.phase != CanaryPhase.FULL:
self._advance_phase()
return True
def _advance_phase(self):
phases = list(CanaryPhase)
current_idx = phases.index(self.phase)
if current_idx < len(phases) - 1:
self.phase = phases[current_idx + 1]
self.phase_start = time.time()
new_weight = self.phase_configs[self.phase]["weight"]
self.router.set_canary_weight(new_weight)
print(f"📈 Advanced to {self.phase.value} phase, "
f"weight: {new_weight*100:.0f}%")
def _rollback(self):
self.router.set_canary_weight(0.0)
self.phase = CanaryPhase.INIT
print("🔄 Rollback complete, all traffic to stable version")
最佳实践
- 渐进放大:从5%开始,逐步增加到20%、50%、100%
- 充分观察:每个阶段至少运行10-30分钟
- 多维监控:同时关注错误率、延迟、质量指标和用户反馈
- 自动回滚:设置明确的健康阈值,自动触发回滚
- 人工确认:关键阶段(如50%到100%)需人工审批