← 返回首页
🧠

降级模型:LLM服务的模型降级与容错方案

📂 llm ⏱ 3 min 530 words

--- title: "降级模型:LLM服务的模型降级与容错方案" description: "探讨LLM服务中的模型降级策略,包括多模型切换、降级链设计、自动故障转移等高可用架构方案" tags: ["降级模型", "容错方案", "高可用", "故障转移"] category: "llm" icon: "🧠"

降级模型:LLM服务的模型降级与容错方案

为什么需要模型降级

单点LLM服务存在固有风险:API故障、配额耗尽、模型下线等。模型降级机制确保在主模型不可用时,自动切换到备用方案,保障服务连续性。

降级链设计

基础降级链

from dataclasses import dataclass
from typing import Optional

@dataclass
class ModelConfig:
    provider: str
    model: str
    priority: int
    max_tokens: int = 4096
    cost_per_1k: float = 0.01

class FallbackChain:
    def __init__(self):
        self.models = sorted(
            [
                ModelConfig("openai", "gpt-4o", priority=1, cost_per_1k=0.005),
                ModelConfig("anthropic", "claude-sonnet-4-20250514", priority=2, cost_per_1k=0.003),
                ModelConfig("openai", "gpt-4o-mini", priority=3, cost_per_1k=0.00015),
                ModelConfig("local", "qwen-7b", priority=4, cost_per_1k=0),
            ],
            key=lambda x: x.priority,
        )

    def get_available_model(self, exclude: set = None) -> Optional[ModelConfig]:
        exclude = exclude or set()
        for model in self.models:
            if model.model not in exclude:
                return model
        return None

分场景降级链

不同业务场景可以使用不同的降级策略:

SCENARIO_CHAINS = {
    "realtime_chat": [
        ModelConfig("openai", "gpt-4o-mini", priority=1),
        ModelConfig("local", "qwen-7b", priority=2),
        ModelConfig("rule_based", "template", priority=3),
    ],
    "content_generation": [
        ModelConfig("openai", "gpt-4o", priority=1),
        ModelConfig("anthropic", "claude-sonnet-4-20250514", priority=2),
        ModelConfig("openai", "gpt-4o-mini", priority=3),
    ],
    "analysis": [
        ModelConfig("openai", "gpt-4o", priority=1),
        ModelConfig("anthropic", "claude-sonnet-4-20250514", priority=2),
    ],
}

自动故障转移

健康检查机制

import asyncio
import time

class ModelHealthChecker:
    def __init__(self):
        self.health_status = {}
        self.last_check = {}

    async def check_health(self, model_config: ModelConfig) -> bool:
        try:
            response = await self._quick_test(model_config)
            self.health_status[model_config.model] = {
                "healthy": True,
                "latency": response.get("latency", 0),
                "last_check": time.time(),
            }
            return True
        except Exception as e:
            self.health_status[model_config.model] = {
                "healthy": False,
                "error": str(e),
                "last_check": time.time(),
            }
            return False

    async def _quick_test(self, model_config: ModelConfig) -> dict:
        start = time.time()
        client = self._get_client(model_config.provider)
        await client.chat.completions.create(
            model=model_config.model,
            messages=[{"role": "user", "content": "Hi"}],
            max_tokens=5,
        )
        return {"latency": time.time() - start}

故障转移执行器

class FallbackExecutor:
    def __init__(self, chain: FallbackChain, health_checker: ModelHealthChecker):
        self.chain = chain
        self.health_checker = health_checker
        self.excluded_models = set()

    async def execute(self, messages: list[dict], scenario: str = "default") -> dict:
        chain = SCENARIO_CHAINS.get(scenario, self.chain.models)

        for model_config in chain:
            if model_config.model in self.excluded_models:
                continue

            # 健康检查
            if not await self.health_checker.check_health(model_config):
                logger.warning(f"模型 {model_config.model} 健康检查失败,跳过")
                continue

            try:
                result = await self._call_model(model_config, messages)
                return {
                    "content": result,
                    "model_used": model_config.model,
                    "fallback_level": model_config.priority - 1,
                }
            except Exception as e:
                logger.error(f"模型 {model_config.model} 调用失败: {e}")
                self.excluded_models.add(model_config.model)
                continue

        raise AllModelsFailedError("所有可用模型均不可用")

    async def _call_model(self, config: ModelConfig, messages: list[dict]) -> str:
        client = self._get_client(config.provider)
        response = await client.chat.completions.create(
            model=config.model,
            messages=messages,
            max_tokens=config.max_tokens,
        )
        return response.choices[0].message.content

降级策略配置

成本感知降级

class CostAwareFallback:
    def __init__(self, budget_per_request: float = 0.05):
        self.budget = budget_per_request
        self.models_by_cost = sorted(
            self.all_models,
            key=lambda m: m.cost_per_1k,
        )

    def select_within_budget(self, estimated_tokens: int) -> list[ModelConfig]:
        affordable = []
        for model in self.models_by_cost:
            estimated_cost = (estimated_tokens / 1000) * model.cost_per_1k
            if estimated_cost <= self.budget:
                affordable.append(model)
        return affordable

延迟感知降级

class LatencyAwareFallback:
    def __init__(self, max_latency_ms: int = 5000):
        self.max_latency = max_latency_ms / 1000
        self.latency_history = {}

    def get_fastest_model(self, exclude: set = None) -> Optional[ModelConfig]:
        available = [
            m for m in self.all_models
            if m.model not in (exclude or set())
        ]

        # 按历史延迟排序
        available.sort(
            key=lambda m: self.latency_history.get(m.model, float("inf"))
        )

        if available and self.latency_history.get(available[0].model) < self.max_latency:
            return available[0]
        return available[0] if available else None

降级结果处理

class FallbackResultHandler:
    def handle(self, result: dict) -> dict:
        fallback_level = result.get("fallback_level", 0)

        if fallback_level == 0:
            # 使用了主模型,正常返回
            return result

        # 使用了降级模型,添加提示
        result["warning"] = f"当前使用了备用模型({result['model_used']}),输出质量可能有所差异"

        if fallback_level >= 2:
            result["limitation"] = "由于主服务不可用,部分高级功能(如长文本生成)可能受限"

        return result

降级监控与告警

class FallbackMonitor:
    def __init__(self):
        self.fallback_events = []

    def record_fallback(self, primary_model: str, fallback_model: str, reason: str):
        event = {
            "timestamp": time.time(),
            "primary": primary_model,
            "fallback": fallback_model,
            "reason": reason,
        }
        self.fallback_events.append(event)

        # 过去1小时降级超过10次触发告警
        recent = [
            e for e in self.fallback_events
            if time.time() - e["timestamp"] < 3600
        ]
        if len(recent) >= 10:
            self._send_alert(f"过去1小时降级{len(recent)}次,主模型: {primary_model}")

    def get_fallback_rate(self, hours: int = 24) -> float:
        cutoff = time.time() - hours * 3600
        recent = [e for e in self.fallback_events if e["timestamp"] > cutoff]
        return len(recent) / max(len(recent) + 100, 1)  # 粗略估计

总结

模型降级是LLM应用高可用架构的关键组件。通过设计合理的降级链、实现健康检查与自动故障转移、结合成本和延迟感知策略,可以确保服务在各种故障场景下仍能持续运行。