← 返回首页
🧠

LLM代理服务

📂 llm ⏱ 3 min 461 words

--- title: "LLM代理服务" description: "构建LLM代理服务的完整指南,实现请求转发、负载均衡、API网关与多模型路由" tags: ["LLM代理", "请求转发", "负载均衡", "API网关"] category: "llm" icon: "🧠"

LLM代理服务

为什么需要LLM代理

在生产环境中,直接暴露LLM推理服务存在安全隐患和管理困难。LLM代理服务作为中间层,提供了统一的API入口、请求鉴权、负载均衡、限流熔断、日志审计等关键能力。它类似于传统微服务架构中的API网关,专门针对LLM的特性进行了优化。

代理服务架构

一个典型的LLM代理架构包含以下组件:

客户端 → API网关 → 代理服务 → 模型集群
              ↓
         鉴权/限流 → 日志/监控 → 缓存层

核心功能包括:请求路由和转发、负载均衡和故障转移、API密钥管理、请求缓存、流量控制、请求/响应日志。

基于Nginx的简单代理

Nginx是构建LLM代理的轻量级选择:

upstream llm_backend {
    least_conn;
    server llm-worker-1:8000 weight=3;
    server llm-worker-2:8000 weight=3;
    server llm-worker-3:8000 weight=2 backup;

    keepalive 32;
}

server {
    listen 443 ssl http2;
    server_name llm-api.example.com;

    ssl_certificate /etc/ssl/certs/llm.crt;
    ssl_certificate_key /etc/ssl/private/llm.key;

    location /v1/ {
        proxy_pass http://llm_backend;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Request-ID $request_id;

        proxy_connect_timeout 10s;
        proxy_read_timeout 300s;
        proxy_send_timeout 300s;

        proxy_buffering off;
        proxy_cache off;
    }

    location /health {
        proxy_pass http://llm_backend/health;
        access_log off;
    }
}

Python代理服务实现

使用FastAPI构建灵活的LLM代理:

from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
import httpx
import hashlib
import json
import time
from collections import defaultdict

app = FastAPI(title="LLM Proxy")

BACKENDS = [
    {"url": "http://worker-1:8000", "weight": 3, "healthy": True},
    {"url": "http://worker-2:8000", "weight": 3, "healthy": True},
    {"url": "http://worker-3:8000", "weight": 2, "healthy": True},
]

request_counts = defaultdict(int)
rate_limits = {}

class RateLimiter:
    def __init__(self, max_requests=100, window_seconds=60):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests = defaultdict(list)

    def is_allowed(self, api_key: str) -> bool:
        now = time.time()
        self.requests[api_key] = [
            t for t in self.requests[api_key]
            if now - t < self.window
        ]
        if len(self.requests[api_key]) >= self.max_requests:
            return False
        self.requests[api_key].append(now)
        return True

limiter = RateLimiter()

def select_backend():
    healthy = [b for b in BACKENDS if b["healthy"]]
    if not healthy:
        raise HTTPException(503, "No healthy backends")
    total_weight = sum(b["weight"] for b in healthy)
    import random
    r = random.uniform(0, total_weight)
    for b in healthy:
        r -= b["weight"]
        if r <= 0:
            return b["url"]
    return healthy[-1]["url"]

@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
    api_key = request.headers.get("Authorization", "").replace("Bearer ", "")
    if not limiter.is_allowed(api_key):
        raise HTTPException(429, "Rate limit exceeded")

    body = await request.json()
    backend = select_backend()

    async with httpx.AsyncClient(timeout=300) as client:
        if body.get("stream"):
            return await proxy_stream(client, backend, body)
        response = await client.post(
            f"{backend}/v1/chat/completions",
            json=body
        )
        return response.json()

async def proxy_stream(client, backend, body):
    async def stream_generator():
        async with client.stream(
            "POST",
            f"{backend}/v1/chat/completions",
            json=body
        ) as response:
            async for chunk in response.aiter_lines():
                if chunk:
                    yield f"{chunk}\n\n"

    return StreamingResponse(stream_generator(), media_type="text/event-stream")

负载均衡策略

代理服务应支持多种负载均衡策略:

class LoadBalancer:
    def __init__(self, strategy="least_connections"):
        self.strategy = strategy
        self.connections = defaultdict(int)

    def select(self, backends):
        if self.strategy == "round_robin":
            return self._round_robin(backends)
        elif self.strategy == "least_connections":
            return self._least_connections(backends)
        elif self.strategy == "weighted":
            return self._weighted(backends)
        elif self.strategy == "latency_based":
            return self._latency_based(backends)

    def _least_connections(self, backends):
        return min(backends, key=lambda b: self.connections[b["url"]])

健康检查与故障转移

import asyncio

async def health_check():
    while True:
        async with httpx.AsyncClient(timeout=5) as client:
            for backend in BACKENDS:
                try:
                    resp = await client.get(f"{backend['url']}/health")
                    backend["healthy"] = resp.status_code == 200
                except Exception:
                    backend["healthy"] = False
        await asyncio.sleep(10)

@app.on_event("startup")
async def startup():
    asyncio.create_task(health_check())

监控与日志

from prometheus_client import Counter, Histogram, generate_latest

proxy_requests = Counter('proxy_requests_total', 'Total requests', ['model', 'status'])
proxy_latency = Histogram('proxy_latency_seconds', 'Request latency', ['model'])

@app.middleware("http")
async def monitor_middleware(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = time.time() - start
    proxy_latency.labels(model=request.url.path).observe(duration)
    proxy_requests.labels(
        model=request.url.path,
        status=response.status_code
    ).inc()
    return response

LLM代理服务是构建生产级LLM平台的核心组件,通过合理的架构设计,可以实现高可用、高性能、易管理的LLM服务基础设施。