← 返回首页
🧠

负载均衡

📂 llm ⏱ 3 min 479 words

--- title: "负载均衡" description: "LLM负载均衡策略与请求路由实现" tags: ["负载均衡", "请求路由", "健康检查", "故障转移", "高可用"] category: "llm" icon: "🧠"

负载均衡

负载均衡是LLM高可用架构的核心组件,负责将请求合理分配到多个后端实例,提高系统吞吐量和可用性。LLM负载均衡需要考虑GPU资源利用率、请求延迟和模型特性等因素。

负载均衡策略

轮询策略

最简单的策略,依次将请求分配给每个后端实例。

import itertools
from typing import List

class RoundRobinBalancer:
    def __init__(self, backends: List[str]):
        self.backends = backends
        self.cycle = itertools.cycle(range(len(backends)))

    def get_backend(self) -> str:
        idx = next(self.cycle)
        return self.backends[idx]

balancer = RoundRobinBalancer(["gpu-0:8000", "gpu-1:8000", "gpu-2:8000"])
for _ in range(6):
    print(f"请求路由到: {balancer.get_backend()}")

加权轮询

根据GPU性能分配不同权重。

class WeightedRoundRobinBalancer:
    def __init__(self, backends: dict):
        self.backends = list(backends.keys())
        self.weights = list(backends.values())
        self.total_weight = sum(self.weights)
        self.current_idx = 0
        self.current_weight = 0

    def get_backend(self) -> str:
        while True:
            self.current_idx = (self.current_idx + 1) % len(self.backends)
            if self.current_idx == 0:
                self.current_weight -= 1
                if self.current_weight < 0:
                    self.current_weight = self.total_weight - 1
            if self.current_weight < self.weights[self.current_idx]:
                return self.backends[self.current_idx]

balancer = WeightedRoundRobinBalancer({"gpu-a100:8000": 3, "gpu-v100:8000": 2, "gpu-t4:8000": 1})
for _ in range(12):
    print(f"路由到: {balancer.get_backend()}")

最少连接数

将请求分配给当前连接数最少的实例。

import asyncio
from dataclasses import dataclass, field

@dataclass
class Backend:
    host: str
    active_connections: int = 0
    total_requests: int = 0

class LeastConnectionsBalancer:
    def __init__(self, backends: List[Backend]):
        self.backends = backends

    def get_backend(self) -> Backend:
        return min(self.backends, key=lambda b: b.active_connections)

    def release(self, backend: Backend):
        backend.active_connections -= 1
        backend.total_requests += 1

    def get_stats(self) -> dict:
        return {b.host: {"active": b.active_connections, "total": b.total_requests} 
                for b in self.backends}

backends = [
    Backend("gpu-0:8000"),
    Backend("gpu-1:8000"),
    Backend("gpu-2:8000"),
]
balancer = LeastConnectionsBalancer(backends)
for i in range(6):
    backend = balancer.get_backend()
    backend.active_connections += 1
    print(f"请求 {i+1} → {backend.host} (活跃连接: {backend.active_connections})")
print(f"统计: {balancer.get_stats()}")

基于队列深度

考虑LLM推理队列深度的智能路由。

import aiohttp
import asyncio
from typing import List

class QueueAwareBalancer:
    def __init__(self, backends: List[str]):
        self.backends = backends

    async def get_backend(self) -> str:
        async with aiohttp.ClientSession() as session:
            min_queue = float('inf')
            best_backend = self.backends[0]

            for backend in self.backends:
                try:
                    async with session.get(f"http://{backend}/health", timeout=2) as resp:
                        status = await resp.json()
                        queue_depth = status.get("queue_depth", 0)
                        if queue_depth < min_queue:
                            min_queue = queue_depth
                            best_backend = backend
                except Exception:
                    continue
            return best_backend

balancer = QueueAwareBalancer(["gpu-0:8000", "gpu-1:8000"])
# backend = await balancer.get_backend()

健康检查

import asyncio
import aiohttp
from dataclasses import dataclass, field
from typing import List
from enum import Enum

class HealthStatus(Enum):
    HEALTHY = "healthy"
    UNHEALTHY = "unhealthy"
    DEGRADED = "degraded"

@dataclass
class HealthChecker:
    backends: List[str]
    check_interval: int = 10
    unhealthy_threshold: int = 3
    healthy_threshold: int = 2
    status: dict = field(default_factory=dict)

    def __post_init__(self):
        for backend in self.backends:
            self.status[backend] = {
                "state": HealthStatus.HEALTHY,
                "fail_count": 0,
                "success_count": 0,
                "last_check": 0
            }

    async def check_health(self, backend: str) -> HealthStatus:
        async with aiohttp.ClientSession() as session:
            try:
                async with session.get(f"http://{backend}/health", timeout=5) as resp:
                    if resp.status == 200:
                        data = await resp.json()
                        if data.get("gpu_memory_usage", 0) > 95:
                            return HealthStatus.DEGRADED
                        return HealthStatus.HEALTHY
                    return HealthStatus.UNHEALTHY
            except Exception:
                return HealthStatus.UNHEALTHY

    async def update_status(self, backend: str, health: HealthStatus):
        status = self.status[backend]
        if health == HealthStatus.HEALTHY:
            status["fail_count"] = 0
            status["success_count"] += 1
            if status["success_count"] >= self.healthy_threshold:
                status["state"] = HealthStatus.HEALTHY
        else:
            status["success_count"] = 0
            status["fail_count"] += 1
            if status["fail_count"] >= self.unhealthy_threshold:
                status["state"] = HealthStatus.UNHEALTHY

    def get_healthy_backends(self) -> List[str]:
        return [b for b, s in self.status.items() if s["state"] == HealthStatus.HEALTHY]

checker = HealthChecker(["gpu-0:8000", "gpu-1:8000", "gpu-2:8000"])
print(f"健康实例: {checker.get_healthy_backends()}")

故障转移

实现自动故障转移和降级策略。当主实例不可用时,自动切换到备用实例;当所有实例不可用时,返回缓存结果或降级响应。

部署建议

  1. 选择适合LLM特性的负载均衡策略
  2. 实现多维度健康检查(GPU利用率、队列深度、响应延迟)
  3. 配置合理的超时和重试机制
  4. 建立完善的监控和告警体系
  5. 定期进行故障演练

负载均衡是LLM高可用架构的基础,合理的策略选择和实现能显著提升服务质量和用户体验。