← 返回首页
🧠

LLM重试策略

📂 llm ⏱ 1 min 171 words

--- title: "LLM重试策略" description: "详解LLM调用中的重试策略,包括指数退避、抖动、条件重试等实用模式" tags: ["重试策略", "容错机制", "可靠性"] category: "llm" icon: "🧠"

LLM重试策略

在实际生产环境中,调用LLM API时遇到临时性故障是不可避免的。网络波动、API限流、服务过载等问题都可能导致请求失败。合理的重试策略能够显著提升系统的可靠性。

为什么需要重试

LLM API调用失败通常分为两类:临时性故障和永久性故障。临时性故障包括网络超时、服务端5xx错误、限流429等,这些错误在短暂等待后通常会自动恢复。永久性故障则包括认证失败、模型不存在等,重试无法解决问题。

重试策略的核心目标是:在合理的时间内恢复临时性故障,同时避免对已经过载的服务造成更大压力。

基础重试实现

import time
import random

def call_llm_with_retry(prompt, max_retries=3, base_delay=1.0):
    for attempt in range(max_retries):
        try:
            response = llm_api.call(prompt)
            return response
        except RateLimitError:
            if attempt == max_retries - 1:
                raise
            time.sleep(base_delay * (2 ** attempt))
        except TemporaryError:
            if attempt == max_retries - 1:
                raise
            time.sleep(base_delay * (2 ** attempt))

指数退避

指数退避是最常用的重试策略。每次重试时,等待时间呈指数级增长,避免对服务端造成持续压力。

import time

def exponential_backoff(attempt, base_delay=1.0, max_delay=60.0):
    delay = base_delay * (2 ** attempt)
    return min(delay, max_delay)

等待时间公式为 base_delay * 2^attempt,通常设置最大延迟上限防止等待过久。实际应用中建议base_delay设为1秒,最大延迟设为60秒。

抖动机制

纯指数退避会导致多个客户端在同一时刻重试,造成"惊群效应"。引入随机抖动能分散重试时间点。

import random

def backoff_with_jitter(attempt, base_delay=1.0, max_delay=60.0):
    delay = base_delay * (2 ** attempt)
    jitter = random.uniform(0, delay * 0.5)
    return min(delay + jitter, max_delay)

抖动范围通常设为当前延迟的0%-50%,这样既能分散重试请求,又不会让整体延迟变得过长。

条件重试

并非所有错误都应该重试。需要根据HTTP状态码和错误类型决定是否重试:

SHOULD_RETRY = {429, 500, 502, 503, 504}
DO_NOT_RETRY = {400, 401, 403, 404}

def should_retry(status_code, attempt, max_retries):
    if status_code in DO_NOT_RETRY:
        return False
    if status_code in SHOULD_RETRY and attempt < max_retries:
        return True
    return False

429(限流)和5xx(服务端错误)值得重试,4xx客户端错误通常不应该重试。

重试预算

在高并发场景下,无限重试可能导致请求风暴。需要设置全局重试预算:

import threading

class RetryBudget:
    def __init__(self, max_retries_per_second=10):
        self.semaphore = threading.Semaphore(max_retries_per_second)
        self.max = max_retries_per_second

    def can_retry(self):
        return self.semaphore.acquire(blocking=False)

    def release(self):
        self.semaphore.release()

通过信号量控制每秒最大重试次数,防止重试请求压垮下游服务。

总结

好的重试策略应该包含:指数退避避免瞬间重试、随机抖动防止惊群、条件判断区分可重试错误、全局预算防止请求风暴。在实际项目中,建议使用成熟的重试库如tenacity,它提供了灵活的重试策略组合能力。