LLM网关
--- title: "LLM网关" description: "全面介绍LLM网关的设计与实现,包括请求聚合、认证授权、限流和监控等功能" tags: ["LLM网关", "API网关", "统一入口", "安全控制"] category: "llm" icon: "🧠"
LLM网关
LLM网关的作用
LLM网关是所有LLM服务请求的统一入口,承担着请求路由、认证授权、限流熔断、协议转换等核心功能。它是构建企业级LLM应用的关键基础设施组件。
核心功能架构
请求聚合
将多个LLM服务接口统一暴露为标准化API:
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import httpx
app = FastAPI(title="LLM Gateway")
class ChatRequest(BaseModel):
model: str
messages: List[dict]
temperature: float = 0.7
max_tokens: int = 1000
class LLMGateway:
def __init__(self):
self.backends = {
"openai": "https://api.openai.com/v1",
"anthropic": "https://api.anthropic.com/v1",
"deepseek": "https://api.deepseek.com/v1"
}
self.auth_handler = AuthHandler()
self.rate_limiter = RateLimiter()
async def route_request(self, request: ChatRequest, provider: str):
# 认证验证
await self.auth_handler.validate(request)
# 限流检查
if not self.rate_limiter.allow(request.model):
raise HTTPException(429, "Rate limit exceeded")
# 路由到后端
backend_url = self.backends.get(provider)
return await self.forward_request(backend_url, request)
认证授权
class AuthHandler:
def __init__(self):
self.api_keys = {}
self.permissions = {}
async def validate(self, request):
api_key = request.headers.get("Authorization")
if not api_key or api_key not in self.api_keys:
raise HTTPException(401, "Invalid API key")
user_info = self.api_keys[api_key]
# 检查权限
if not self.check_permission(user_info, request.model):
raise HTTPException(403, "Insufficient permissions")
return user_info
def check_permission(self, user_info, model):
allowed_models = self.permissions.get(user_info["role"], [])
return model in allowed_models or "*" in allowed_models
限流与熔断
import time
from collections import defaultdict
class RateLimiter:
def __init__(self):
self.request_counts = defaultdict(list)
self.limits = {
"gpt-4o": {"rpm": 60, "tpm": 100000},
"gpt-4o-mini": {"rpm": 200, "tpm": 400000}
}
def allow(self, model: str, user_id: str = "default"):
now = time.time()
key = f"{user_id}:{model}"
# 清理过期记录
self.request_counts[key] = [
t for t in self.request_counts[key] if now - t < 60
]
limit = self.limits.get(model, {"rpm": 60})["rpm"]
if len(self.request_counts[key]) >= limit:
return False
self.request_counts[key].append(now)
return True
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.last_failure_time = None
self.state = "closed"
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
def should_allow(self):
if self.state == "closed":
return True
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "half-open"
return True
return False
return True # half-open状态允许试探性请求
高级功能
请求/响应转换
class ProtocolTransformer:
@staticmethod
def transform_request(generic_request, target_provider):
if target_provider == "openai":
return {
"model": generic_request.model,
"messages": generic_request.messages,
"temperature": generic_request.temperature
}
elif target_provider == "anthropic":
return {
"model": generic_request.model,
"messages": generic_request.messages,
"max_tokens": generic_request.max_tokens
}
缓存层
class ResponseCache:
def __init__(self, ttl=300):
self.cache = {}
self.ttl = ttl
def get(self, request_hash):
if request_hash in self.cache:
entry = self.cache[request_hash]
if time.time() - entry["time"] < self.ttl:
return entry["response"]
del self.cache[request_hash]
return None
def set(self, request_hash, response):
self.cache[request_hash] = {
"response": response,
"time": time.time()
}
监控与日志
关键监控指标
- 请求吞吐量和延迟
- 各后端服务健康状态
- 错误率和错误分布
- API密钥使用情况
- 成本消耗统计
审计日志
记录所有LLM请求的详细信息,支持合规审计和问题排查。包括请求内容摘要、用户身份、路由决策和响应结果。
部署建议
- 高可用部署:多副本部署网关服务,避免单点故障
- 水平扩展:根据流量自动扩展网关实例数量
- 安全加固:启用TLS加密,配置WAF防护
- 渐进式上线:先在测试环境验证,再逐步切换生产流量