降级模型:LLM服务的模型降级与容错方案
--- title: "降级模型:LLM服务的模型降级与容错方案" description: "探讨LLM服务中的模型降级策略,包括多模型切换、降级链设计、自动故障转移等高可用架构方案" tags: ["降级模型", "容错方案", "高可用", "故障转移"] category: "llm" icon: "🧠"
降级模型:LLM服务的模型降级与容错方案
为什么需要模型降级
单点LLM服务存在固有风险:API故障、配额耗尽、模型下线等。模型降级机制确保在主模型不可用时,自动切换到备用方案,保障服务连续性。
降级链设计
基础降级链
from dataclasses import dataclass
from typing import Optional
@dataclass
class ModelConfig:
provider: str
model: str
priority: int
max_tokens: int = 4096
cost_per_1k: float = 0.01
class FallbackChain:
def __init__(self):
self.models = sorted(
[
ModelConfig("openai", "gpt-4o", priority=1, cost_per_1k=0.005),
ModelConfig("anthropic", "claude-sonnet-4-20250514", priority=2, cost_per_1k=0.003),
ModelConfig("openai", "gpt-4o-mini", priority=3, cost_per_1k=0.00015),
ModelConfig("local", "qwen-7b", priority=4, cost_per_1k=0),
],
key=lambda x: x.priority,
)
def get_available_model(self, exclude: set = None) -> Optional[ModelConfig]:
exclude = exclude or set()
for model in self.models:
if model.model not in exclude:
return model
return None
分场景降级链
不同业务场景可以使用不同的降级策略:
SCENARIO_CHAINS = {
"realtime_chat": [
ModelConfig("openai", "gpt-4o-mini", priority=1),
ModelConfig("local", "qwen-7b", priority=2),
ModelConfig("rule_based", "template", priority=3),
],
"content_generation": [
ModelConfig("openai", "gpt-4o", priority=1),
ModelConfig("anthropic", "claude-sonnet-4-20250514", priority=2),
ModelConfig("openai", "gpt-4o-mini", priority=3),
],
"analysis": [
ModelConfig("openai", "gpt-4o", priority=1),
ModelConfig("anthropic", "claude-sonnet-4-20250514", priority=2),
],
}
自动故障转移
健康检查机制
import asyncio
import time
class ModelHealthChecker:
def __init__(self):
self.health_status = {}
self.last_check = {}
async def check_health(self, model_config: ModelConfig) -> bool:
try:
response = await self._quick_test(model_config)
self.health_status[model_config.model] = {
"healthy": True,
"latency": response.get("latency", 0),
"last_check": time.time(),
}
return True
except Exception as e:
self.health_status[model_config.model] = {
"healthy": False,
"error": str(e),
"last_check": time.time(),
}
return False
async def _quick_test(self, model_config: ModelConfig) -> dict:
start = time.time()
client = self._get_client(model_config.provider)
await client.chat.completions.create(
model=model_config.model,
messages=[{"role": "user", "content": "Hi"}],
max_tokens=5,
)
return {"latency": time.time() - start}
故障转移执行器
class FallbackExecutor:
def __init__(self, chain: FallbackChain, health_checker: ModelHealthChecker):
self.chain = chain
self.health_checker = health_checker
self.excluded_models = set()
async def execute(self, messages: list[dict], scenario: str = "default") -> dict:
chain = SCENARIO_CHAINS.get(scenario, self.chain.models)
for model_config in chain:
if model_config.model in self.excluded_models:
continue
# 健康检查
if not await self.health_checker.check_health(model_config):
logger.warning(f"模型 {model_config.model} 健康检查失败,跳过")
continue
try:
result = await self._call_model(model_config, messages)
return {
"content": result,
"model_used": model_config.model,
"fallback_level": model_config.priority - 1,
}
except Exception as e:
logger.error(f"模型 {model_config.model} 调用失败: {e}")
self.excluded_models.add(model_config.model)
continue
raise AllModelsFailedError("所有可用模型均不可用")
async def _call_model(self, config: ModelConfig, messages: list[dict]) -> str:
client = self._get_client(config.provider)
response = await client.chat.completions.create(
model=config.model,
messages=messages,
max_tokens=config.max_tokens,
)
return response.choices[0].message.content
降级策略配置
成本感知降级
class CostAwareFallback:
def __init__(self, budget_per_request: float = 0.05):
self.budget = budget_per_request
self.models_by_cost = sorted(
self.all_models,
key=lambda m: m.cost_per_1k,
)
def select_within_budget(self, estimated_tokens: int) -> list[ModelConfig]:
affordable = []
for model in self.models_by_cost:
estimated_cost = (estimated_tokens / 1000) * model.cost_per_1k
if estimated_cost <= self.budget:
affordable.append(model)
return affordable
延迟感知降级
class LatencyAwareFallback:
def __init__(self, max_latency_ms: int = 5000):
self.max_latency = max_latency_ms / 1000
self.latency_history = {}
def get_fastest_model(self, exclude: set = None) -> Optional[ModelConfig]:
available = [
m for m in self.all_models
if m.model not in (exclude or set())
]
# 按历史延迟排序
available.sort(
key=lambda m: self.latency_history.get(m.model, float("inf"))
)
if available and self.latency_history.get(available[0].model) < self.max_latency:
return available[0]
return available[0] if available else None
降级结果处理
class FallbackResultHandler:
def handle(self, result: dict) -> dict:
fallback_level = result.get("fallback_level", 0)
if fallback_level == 0:
# 使用了主模型,正常返回
return result
# 使用了降级模型,添加提示
result["warning"] = f"当前使用了备用模型({result['model_used']}),输出质量可能有所差异"
if fallback_level >= 2:
result["limitation"] = "由于主服务不可用,部分高级功能(如长文本生成)可能受限"
return result
降级监控与告警
class FallbackMonitor:
def __init__(self):
self.fallback_events = []
def record_fallback(self, primary_model: str, fallback_model: str, reason: str):
event = {
"timestamp": time.time(),
"primary": primary_model,
"fallback": fallback_model,
"reason": reason,
}
self.fallback_events.append(event)
# 过去1小时降级超过10次触发告警
recent = [
e for e in self.fallback_events
if time.time() - e["timestamp"] < 3600
]
if len(recent) >= 10:
self._send_alert(f"过去1小时降级{len(recent)}次,主模型: {primary_model}")
def get_fallback_rate(self, hours: int = 24) -> float:
cutoff = time.time() - hours * 3600
recent = [e for e in self.fallback_events if e["timestamp"] > cutoff]
return len(recent) / max(len(recent) + 100, 1) # 粗略估计
总结
模型降级是LLM应用高可用架构的关键组件。通过设计合理的降级链、实现健康检查与自动故障转移、结合成本和延迟感知策略,可以确保服务在各种故障场景下仍能持续运行。