负载均衡
--- title: "负载均衡" description: "LLM负载均衡策略与请求路由实现" tags: ["负载均衡", "请求路由", "健康检查", "故障转移", "高可用"] category: "llm" icon: "🧠"
负载均衡
负载均衡是LLM高可用架构的核心组件,负责将请求合理分配到多个后端实例,提高系统吞吐量和可用性。LLM负载均衡需要考虑GPU资源利用率、请求延迟和模型特性等因素。
负载均衡策略
轮询策略
最简单的策略,依次将请求分配给每个后端实例。
import itertools
from typing import List
class RoundRobinBalancer:
def __init__(self, backends: List[str]):
self.backends = backends
self.cycle = itertools.cycle(range(len(backends)))
def get_backend(self) -> str:
idx = next(self.cycle)
return self.backends[idx]
balancer = RoundRobinBalancer(["gpu-0:8000", "gpu-1:8000", "gpu-2:8000"])
for _ in range(6):
print(f"请求路由到: {balancer.get_backend()}")
加权轮询
根据GPU性能分配不同权重。
class WeightedRoundRobinBalancer:
def __init__(self, backends: dict):
self.backends = list(backends.keys())
self.weights = list(backends.values())
self.total_weight = sum(self.weights)
self.current_idx = 0
self.current_weight = 0
def get_backend(self) -> str:
while True:
self.current_idx = (self.current_idx + 1) % len(self.backends)
if self.current_idx == 0:
self.current_weight -= 1
if self.current_weight < 0:
self.current_weight = self.total_weight - 1
if self.current_weight < self.weights[self.current_idx]:
return self.backends[self.current_idx]
balancer = WeightedRoundRobinBalancer({"gpu-a100:8000": 3, "gpu-v100:8000": 2, "gpu-t4:8000": 1})
for _ in range(12):
print(f"路由到: {balancer.get_backend()}")
最少连接数
将请求分配给当前连接数最少的实例。
import asyncio
from dataclasses import dataclass, field
@dataclass
class Backend:
host: str
active_connections: int = 0
total_requests: int = 0
class LeastConnectionsBalancer:
def __init__(self, backends: List[Backend]):
self.backends = backends
def get_backend(self) -> Backend:
return min(self.backends, key=lambda b: b.active_connections)
def release(self, backend: Backend):
backend.active_connections -= 1
backend.total_requests += 1
def get_stats(self) -> dict:
return {b.host: {"active": b.active_connections, "total": b.total_requests}
for b in self.backends}
backends = [
Backend("gpu-0:8000"),
Backend("gpu-1:8000"),
Backend("gpu-2:8000"),
]
balancer = LeastConnectionsBalancer(backends)
for i in range(6):
backend = balancer.get_backend()
backend.active_connections += 1
print(f"请求 {i+1} → {backend.host} (活跃连接: {backend.active_connections})")
print(f"统计: {balancer.get_stats()}")
基于队列深度
考虑LLM推理队列深度的智能路由。
import aiohttp
import asyncio
from typing import List
class QueueAwareBalancer:
def __init__(self, backends: List[str]):
self.backends = backends
async def get_backend(self) -> str:
async with aiohttp.ClientSession() as session:
min_queue = float('inf')
best_backend = self.backends[0]
for backend in self.backends:
try:
async with session.get(f"http://{backend}/health", timeout=2) as resp:
status = await resp.json()
queue_depth = status.get("queue_depth", 0)
if queue_depth < min_queue:
min_queue = queue_depth
best_backend = backend
except Exception:
continue
return best_backend
balancer = QueueAwareBalancer(["gpu-0:8000", "gpu-1:8000"])
# backend = await balancer.get_backend()
健康检查
import asyncio
import aiohttp
from dataclasses import dataclass, field
from typing import List
from enum import Enum
class HealthStatus(Enum):
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
DEGRADED = "degraded"
@dataclass
class HealthChecker:
backends: List[str]
check_interval: int = 10
unhealthy_threshold: int = 3
healthy_threshold: int = 2
status: dict = field(default_factory=dict)
def __post_init__(self):
for backend in self.backends:
self.status[backend] = {
"state": HealthStatus.HEALTHY,
"fail_count": 0,
"success_count": 0,
"last_check": 0
}
async def check_health(self, backend: str) -> HealthStatus:
async with aiohttp.ClientSession() as session:
try:
async with session.get(f"http://{backend}/health", timeout=5) as resp:
if resp.status == 200:
data = await resp.json()
if data.get("gpu_memory_usage", 0) > 95:
return HealthStatus.DEGRADED
return HealthStatus.HEALTHY
return HealthStatus.UNHEALTHY
except Exception:
return HealthStatus.UNHEALTHY
async def update_status(self, backend: str, health: HealthStatus):
status = self.status[backend]
if health == HealthStatus.HEALTHY:
status["fail_count"] = 0
status["success_count"] += 1
if status["success_count"] >= self.healthy_threshold:
status["state"] = HealthStatus.HEALTHY
else:
status["success_count"] = 0
status["fail_count"] += 1
if status["fail_count"] >= self.unhealthy_threshold:
status["state"] = HealthStatus.UNHEALTHY
def get_healthy_backends(self) -> List[str]:
return [b for b, s in self.status.items() if s["state"] == HealthStatus.HEALTHY]
checker = HealthChecker(["gpu-0:8000", "gpu-1:8000", "gpu-2:8000"])
print(f"健康实例: {checker.get_healthy_backends()}")
故障转移
实现自动故障转移和降级策略。当主实例不可用时,自动切换到备用实例;当所有实例不可用时,返回缓存结果或降级响应。
部署建议
- 选择适合LLM特性的负载均衡策略
- 实现多维度健康检查(GPU利用率、队列深度、响应延迟)
- 配置合理的超时和重试机制
- 建立完善的监控和告警体系
- 定期进行故障演练
负载均衡是LLM高可用架构的基础,合理的策略选择和实现能显著提升服务质量和用户体验。