LLM代理服务
--- title: "LLM代理服务" description: "构建LLM代理服务的完整指南,实现请求转发、负载均衡、API网关与多模型路由" tags: ["LLM代理", "请求转发", "负载均衡", "API网关"] category: "llm" icon: "🧠"
LLM代理服务
为什么需要LLM代理
在生产环境中,直接暴露LLM推理服务存在安全隐患和管理困难。LLM代理服务作为中间层,提供了统一的API入口、请求鉴权、负载均衡、限流熔断、日志审计等关键能力。它类似于传统微服务架构中的API网关,专门针对LLM的特性进行了优化。
代理服务架构
一个典型的LLM代理架构包含以下组件:
客户端 → API网关 → 代理服务 → 模型集群
↓
鉴权/限流 → 日志/监控 → 缓存层
核心功能包括:请求路由和转发、负载均衡和故障转移、API密钥管理、请求缓存、流量控制、请求/响应日志。
基于Nginx的简单代理
Nginx是构建LLM代理的轻量级选择:
upstream llm_backend {
least_conn;
server llm-worker-1:8000 weight=3;
server llm-worker-2:8000 weight=3;
server llm-worker-3:8000 weight=2 backup;
keepalive 32;
}
server {
listen 443 ssl http2;
server_name llm-api.example.com;
ssl_certificate /etc/ssl/certs/llm.crt;
ssl_certificate_key /etc/ssl/private/llm.key;
location /v1/ {
proxy_pass http://llm_backend;
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Request-ID $request_id;
proxy_connect_timeout 10s;
proxy_read_timeout 300s;
proxy_send_timeout 300s;
proxy_buffering off;
proxy_cache off;
}
location /health {
proxy_pass http://llm_backend/health;
access_log off;
}
}
Python代理服务实现
使用FastAPI构建灵活的LLM代理:
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
import httpx
import hashlib
import json
import time
from collections import defaultdict
app = FastAPI(title="LLM Proxy")
BACKENDS = [
{"url": "http://worker-1:8000", "weight": 3, "healthy": True},
{"url": "http://worker-2:8000", "weight": 3, "healthy": True},
{"url": "http://worker-3:8000", "weight": 2, "healthy": True},
]
request_counts = defaultdict(int)
rate_limits = {}
class RateLimiter:
def __init__(self, max_requests=100, window_seconds=60):
self.max_requests = max_requests
self.window = window_seconds
self.requests = defaultdict(list)
def is_allowed(self, api_key: str) -> bool:
now = time.time()
self.requests[api_key] = [
t for t in self.requests[api_key]
if now - t < self.window
]
if len(self.requests[api_key]) >= self.max_requests:
return False
self.requests[api_key].append(now)
return True
limiter = RateLimiter()
def select_backend():
healthy = [b for b in BACKENDS if b["healthy"]]
if not healthy:
raise HTTPException(503, "No healthy backends")
total_weight = sum(b["weight"] for b in healthy)
import random
r = random.uniform(0, total_weight)
for b in healthy:
r -= b["weight"]
if r <= 0:
return b["url"]
return healthy[-1]["url"]
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
api_key = request.headers.get("Authorization", "").replace("Bearer ", "")
if not limiter.is_allowed(api_key):
raise HTTPException(429, "Rate limit exceeded")
body = await request.json()
backend = select_backend()
async with httpx.AsyncClient(timeout=300) as client:
if body.get("stream"):
return await proxy_stream(client, backend, body)
response = await client.post(
f"{backend}/v1/chat/completions",
json=body
)
return response.json()
async def proxy_stream(client, backend, body):
async def stream_generator():
async with client.stream(
"POST",
f"{backend}/v1/chat/completions",
json=body
) as response:
async for chunk in response.aiter_lines():
if chunk:
yield f"{chunk}\n\n"
return StreamingResponse(stream_generator(), media_type="text/event-stream")
负载均衡策略
代理服务应支持多种负载均衡策略:
class LoadBalancer:
def __init__(self, strategy="least_connections"):
self.strategy = strategy
self.connections = defaultdict(int)
def select(self, backends):
if self.strategy == "round_robin":
return self._round_robin(backends)
elif self.strategy == "least_connections":
return self._least_connections(backends)
elif self.strategy == "weighted":
return self._weighted(backends)
elif self.strategy == "latency_based":
return self._latency_based(backends)
def _least_connections(self, backends):
return min(backends, key=lambda b: self.connections[b["url"]])
健康检查与故障转移
import asyncio
async def health_check():
while True:
async with httpx.AsyncClient(timeout=5) as client:
for backend in BACKENDS:
try:
resp = await client.get(f"{backend['url']}/health")
backend["healthy"] = resp.status_code == 200
except Exception:
backend["healthy"] = False
await asyncio.sleep(10)
@app.on_event("startup")
async def startup():
asyncio.create_task(health_check())
监控与日志
from prometheus_client import Counter, Histogram, generate_latest
proxy_requests = Counter('proxy_requests_total', 'Total requests', ['model', 'status'])
proxy_latency = Histogram('proxy_latency_seconds', 'Request latency', ['model'])
@app.middleware("http")
async def monitor_middleware(request: Request, call_next):
start = time.time()
response = await call_next(request)
duration = time.time() - start
proxy_latency.labels(model=request.url.path).observe(duration)
proxy_requests.labels(
model=request.url.path,
status=response.status_code
).inc()
return response
LLM代理服务是构建生产级LLM平台的核心组件,通过合理的架构设计,可以实现高可用、高性能、易管理的LLM服务基础设施。