← 返回首页
🧠

LLM金丝雀发布

📂 llm ⏱ 3 min 500 words

--- title: "LLM金丝雀发布" description: "LLM模型的金丝雀发布策略,实现模型的渐进式上线和实时监控,确保生产环境稳定性" tags: ["金丝雀发布", "渐进部署", "生产监控"] category: "llm" icon: "🧠"

LLM金丝雀发布

概述

金丝雀发布(Canary Deployment)是一种渐进式部署策略,先将新模型部署到小部分流量上,观察关键指标后再逐步扩大部署范围。这种策略对LLM项目尤为重要,因为模型行为的不确定性可能导致意想不到的生产问题。

部署架构

金丝雀发布的核心是流量路由。通过负载均衡器或API网关,将一部分用户请求路由到新模型,其余请求继续由旧模型处理。

用户请求 → 负载均衡器 → 旧模型 (90%流量)
                        → 新模型 (10%流量)
                              ↓
                        监控系统 → 自动决策

实现方案

流量路由

# canary/router.py
import hashlib
import time
from dataclasses import dataclass
from typing import Optional

@dataclass
class DeploymentTarget:
    model_name: str
    endpoint: str
    weight: float
    active: bool = True

class CanaryRouter:
    def __init__(self):
        self.targets: list[DeploymentTarget] = []
        self.canary_weight: float = 0.1  # 金丝雀初始权重
    
    def add_target(self, target: DeploymentTarget):
        self.targets.append(target)
    
    def set_canary_weight(self, weight: float):
        self.canary_weight = max(0.0, min(1.0, weight))
        self._update_weights()
    
    def _update_weights(self):
        if len(self.targets) < 2:
            return
        
        # 第一个为稳定版本,第二个为金丝雀版本
        self.targets[0].weight = 1.0 - self.canary_weight
        self.targets[1].weight = self.canary_weight
    
    def route(self, request_id: str) -> DeploymentTarget:
        if len(self.targets) == 0:
            raise ValueError("No deployment targets configured")
        
        if len(self.targets) == 1:
            return self.targets[0]
        
        hash_val = int(hashlib.md5(request_id.encode()).hexdigest()[:8], 16)
        threshold = int(self.targets[0].weight * 1000)
        
        if (hash_val % 1000) < threshold:
            return self.targets[0]
        else:
            return self.targets[1]

监控与自动决策

# canary/monitor.py
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional

@dataclass
class CanaryMetrics:
    request_count: int = 0
    error_count: int = 0
    total_latency_ms: float = 0
    latency_count: int = 0
    
    # 质量指标
    ratings: list = field(default_factory=list)
    hallucination_count: int = 0
    
    # 成本指标
    total_tokens: int = 0

class CanaryMonitor:
    def __init__(self, stable_metrics: CanaryMetrics, 
                 canary_metrics: CanaryMetrics):
        self.stable = stable_metrics
        self.canary = canary_metrics
        self.start_time = datetime.now()
    
    def check_canary_health(self, 
                           max_error_rate: float = 0.01,
                           max_latency_increase: float = 1.5,
                           max_latency_ms: float = 2000) -> dict:
        stable_error_rate = (
            self.stable.error_count / self.stable.request_count 
            if self.stable.request_count > 0 else 0
        )
        canary_error_rate = (
            self.canary.error_count / self.canary.request_count
            if self.canary.request_count > 0 else 0
        )
        
        stable_avg_latency = (
            self.stable.total_latency_ms / self.stable.latency_count
            if self.stable.latency_count > 0 else 0
        )
        canary_avg_latency = (
            self.canary.total_latency_ms / self.canary.latency_count
            if self.canary.latency_count > 0 else 0
        )
        
        checks = {
            "error_rate": {
                "stable": stable_error_rate,
                "canary": canary_error_rate,
                "healthy": canary_error_rate <= max_error_rate,
                "within_tolerance": (
                    canary_error_rate <= stable_error_rate * max_latency_increase
                    or stable_error_rate == 0
                ),
            },
            "latency": {
                "stable_ms": stable_avg_latency,
                "canary_ms": canary_avg_latency,
                "healthy": canary_avg_latency <= max_latency_ms,
                "within_tolerance": (
                    canary_avg_latency <= stable_avg_latency * max_latency_increase
                    or stable_avg_latency == 0
                ),
            },
        }
        
        all_healthy = all(
            check["healthy"] and check["within_tolerance"]
            for check in checks.values()
        )
        
        return {
            "healthy": all_healthy,
            "checks": checks,
            "recommendation": "continue" if all_healthy else "rollback",
        }

自动化金丝雀发布

# canary/automated_release.py
import time
from enum import Enum

class CanaryPhase(Enum):
    INIT = "init"          # 5%流量
    EARLY = "early"        # 20%流量
    PROGRESS = "progress"  # 50%流量
    FULL = "full"          # 100%流量

class AutomatedCanaryRelease:
    def __init__(self, router: CanaryRouter, monitor: CanaryMonitor):
        self.router = router
        self.monitor = monitor
        self.phase = CanaryPhase.INIT
        self.phase_start = time.time()
        
        self.phase_configs = {
            CanaryPhase.INIT: {"weight": 0.05, "duration_min": 10},
            CanaryPhase.EARLY: {"weight": 0.20, "duration_min": 30},
            CanaryPhase.PROGRESS: {"weight": 0.50, "duration_min": 60},
            CanaryPhase.FULL: {"weight": 1.0, "duration_min": 0},
        }
    
    def run_cycle(self):
        config = self.phase_configs[self.phase]
        elapsed_min = (time.time() - self.phase_start) / 60
        
        health = self.monitor.check_canary_health()
        
        if health["recommendation"] == "rollback":
            print("⚠️ Canary unhealthy, triggering rollback!")
            self._rollback()
            return False
        
        if elapsed_min >= config["duration_min"] and self.phase != CanaryPhase.FULL:
            self._advance_phase()
        
        return True
    
    def _advance_phase(self):
        phases = list(CanaryPhase)
        current_idx = phases.index(self.phase)
        
        if current_idx < len(phases) - 1:
            self.phase = phases[current_idx + 1]
            self.phase_start = time.time()
            
            new_weight = self.phase_configs[self.phase]["weight"]
            self.router.set_canary_weight(new_weight)
            
            print(f"📈 Advanced to {self.phase.value} phase, "
                  f"weight: {new_weight*100:.0f}%")
    
    def _rollback(self):
        self.router.set_canary_weight(0.0)
        self.phase = CanaryPhase.INIT
        print("🔄 Rollback complete, all traffic to stable version")

最佳实践

  1. 渐进放大:从5%开始,逐步增加到20%、50%、100%
  2. 充分观察:每个阶段至少运行10-30分钟
  3. 多维监控:同时关注错误率、延迟、质量指标和用户反馈
  4. 自动回滚:设置明确的健康阈值,自动触发回滚
  5. 人工确认:关键阶段(如50%到100%)需人工审批