← 返回首页
🧠

LLM网关

📂 llm ⏱ 2 min 393 words

--- title: "LLM网关" description: "全面介绍LLM网关的设计与实现,包括请求聚合、认证授权、限流和监控等功能" tags: ["LLM网关", "API网关", "统一入口", "安全控制"] category: "llm" icon: "🧠"

LLM网关

LLM网关的作用

LLM网关是所有LLM服务请求的统一入口,承担着请求路由、认证授权、限流熔断、协议转换等核心功能。它是构建企业级LLM应用的关键基础设施组件。

核心功能架构

请求聚合

将多个LLM服务接口统一暴露为标准化API:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import httpx

app = FastAPI(title="LLM Gateway")

class ChatRequest(BaseModel):
    model: str
    messages: List[dict]
    temperature: float = 0.7
    max_tokens: int = 1000

class LLMGateway:
    def __init__(self):
        self.backends = {
            "openai": "https://api.openai.com/v1",
            "anthropic": "https://api.anthropic.com/v1",
            "deepseek": "https://api.deepseek.com/v1"
        }
        self.auth_handler = AuthHandler()
        self.rate_limiter = RateLimiter()
    
    async def route_request(self, request: ChatRequest, provider: str):
        # 认证验证
        await self.auth_handler.validate(request)
        
        # 限流检查
        if not self.rate_limiter.allow(request.model):
            raise HTTPException(429, "Rate limit exceeded")
        
        # 路由到后端
        backend_url = self.backends.get(provider)
        return await self.forward_request(backend_url, request)

认证授权

class AuthHandler:
    def __init__(self):
        self.api_keys = {}
        self.permissions = {}
    
    async def validate(self, request):
        api_key = request.headers.get("Authorization")
        
        if not api_key or api_key not in self.api_keys:
            raise HTTPException(401, "Invalid API key")
        
        user_info = self.api_keys[api_key]
        
        # 检查权限
        if not self.check_permission(user_info, request.model):
            raise HTTPException(403, "Insufficient permissions")
        
        return user_info
    
    def check_permission(self, user_info, model):
        allowed_models = self.permissions.get(user_info["role"], [])
        return model in allowed_models or "*" in allowed_models

限流与熔断

import time
from collections import defaultdict

class RateLimiter:
    def __init__(self):
        self.request_counts = defaultdict(list)
        self.limits = {
            "gpt-4o": {"rpm": 60, "tpm": 100000},
            "gpt-4o-mini": {"rpm": 200, "tpm": 400000}
        }
    
    def allow(self, model: str, user_id: str = "default"):
        now = time.time()
        key = f"{user_id}:{model}"
        
        # 清理过期记录
        self.request_counts[key] = [
            t for t in self.request_counts[key] if now - t < 60
        ]
        
        limit = self.limits.get(model, {"rpm": 60})["rpm"]
        
        if len(self.request_counts[key]) >= limit:
            return False
        
        self.request_counts[key].append(now)
        return True

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.last_failure_time = None
        self.state = "closed"
    
    def record_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = "open"
    
    def should_allow(self):
        if self.state == "closed":
            return True
        
        if self.state == "open":
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = "half-open"
                return True
            return False
        
        return True  # half-open状态允许试探性请求

高级功能

请求/响应转换

class ProtocolTransformer:
    @staticmethod
    def transform_request(generic_request, target_provider):
        if target_provider == "openai":
            return {
                "model": generic_request.model,
                "messages": generic_request.messages,
                "temperature": generic_request.temperature
            }
        elif target_provider == "anthropic":
            return {
                "model": generic_request.model,
                "messages": generic_request.messages,
                "max_tokens": generic_request.max_tokens
            }

缓存层

class ResponseCache:
    def __init__(self, ttl=300):
        self.cache = {}
        self.ttl = ttl
    
    def get(self, request_hash):
        if request_hash in self.cache:
            entry = self.cache[request_hash]
            if time.time() - entry["time"] < self.ttl:
                return entry["response"]
            del self.cache[request_hash]
        return None
    
    def set(self, request_hash, response):
        self.cache[request_hash] = {
            "response": response,
            "time": time.time()
        }

监控与日志

关键监控指标

审计日志

记录所有LLM请求的详细信息,支持合规审计和问题排查。包括请求内容摘要、用户身份、路由决策和响应结果。

部署建议

  1. 高可用部署:多副本部署网关服务,避免单点故障
  2. 水平扩展:根据流量自动扩展网关实例数量
  3. 安全加固:启用TLS加密,配置WAF防护
  4. 渐进式上线:先在测试环境验证,再逐步切换生产流量