← 返回首页
🧠

贝叶斯优化在LLM超参数调优中的应用

📂 llm ⏱ 6 min 1123 words

--- title: "贝叶斯优化在LLM超参数调优中的应用" description: "介绍贝叶斯优化算法在大型语言模型超参数调优中的原理、实现和最佳实践。" tags: ["贝叶斯优化", "超参数调优", "llm", "机器学习", "优化算法"] category: "llm" icon: "🧠"

贝叶斯优化在LLM超参数调优中的应用

什么是贝叶斯优化?

贝叶斯优化是一种基于贝叶斯定理的全局优化算法,特别适用于:

其核心思想是建立目标函数的概率模型,然后根据模型选择下一个最有希望的参数组合进行评估。

贝叶斯优化原理

1. 代理模型(Surrogate Model)

class GaussianProcessSurrogate:
    def __init__(self):
        self.gp = None
        self.X_observed = []
        self.y_observed = []
    
    def fit(self, X, y):
        """拟合高斯过程模型"""
        from sklearn.gaussian_process import GaussianProcessRegressor
        from sklearn.gaussian_process.kernels import RBF, ConstantKernel
        
        # 定义核函数
        kernel = ConstantKernel(1.0) * RBF(length_scale=1.0)
        
        self.gp = GaussianProcessRegressor(
            kernel=kernel,
            n_restarts_optimizer=10,
            alpha=1e-6
        )
        
        self.X_observed = X
        self.y_observed = y
        self.gp.fit(X, y)
    
    def predict(self, X):
        """预测均值和标准差"""
        if self.gp is None:
            return np.zeros(len(X)), np.ones(len(X))
        
        mean, std = self.gp.predict(X, return_std=True)
        return mean, std

2. 获取函数(Acquisition Function)

class AcquisitionFunction:
    @staticmethod
    def expected_improvement(X, surrogate, best_y, xi=0.01):
        """期望改进(EI)"""
        from scipy.stats import norm
        
        mean, std = surrogate.predict(X)
        
        # 避免除零
        std = np.maximum(std, 1e-9)
        
        # 计算期望改进
        z = (mean - best_y - xi) / std
        ei = std * (z * norm.cdf(z) + norm.pdf(z))
        
        return ei
    
    @staticmethod
    def probability_of_improvement(X, surrogate, best_y, xi=0.01):
        """改进概率(PI)"""
        from scipy.stats import norm
        
        mean, std = surrogate.predict(X)
        
        # 避免除零
        std = np.maximum(std, 1e-9)
        
        z = (mean - best_y - xi) / std
        pi = norm.cdf(z)
        
        return pi
    
    @staticmethod
    def upper_confidence_bound(X, surrogate, kappa=2.576):
        """上置信界(UCB)"""
        mean, std = surrogate.predict(X)
        return mean + kappa * std

3. 优化器

class BayesianOptimizer:
    def __init__(self, param_bounds, n_initial=5, n_iterations=25):
        self.param_bounds = param_bounds
        self.n_initial = n_initial
        self.n_iterations = n_iterations
        
        self.surrogate = GaussianProcessSurrogate()
        self.acquisition = AcquisitionFunction()
        
        self.X_observed = []
        self.y_observed = []
        self.best_y = -float('inf')
        self.best_x = None
    
    def _random_sample(self):
        """随机采样参数"""
        params = []
        for bounds in self.param_bounds:
            if isinstance(bounds, list):
                params.append(np.random.choice(bounds))
            else:
                params.append(np.random.uniform(bounds[0], bounds[1]))
        return np.array(params)
    
    def _suggest_next(self):
        """建议下一个参数点"""
        # 使用获取函数选择下一个点
        best_acquisition = -float('inf')
        best_x = None
        
        # 多次随机采样选择最佳
        for _ in range(1000):
            x_candidate = self._random_sample()
            x_candidate = x_candidate.reshape(1, -1)
            
            acquisition_value = self.acquisition.expected_improvement(
                x_candidate, self.surrogate, self.best_y
            )
            
            if acquisition_value > best_acquisition:
                best_acquisition = acquisition_value
                best_x = x_candidate.flatten()
        
        return best_x
    
    def optimize(self, objective_function):
        """执行优化"""
        # 初始采样
        for i in range(self.n_initial):
            x = self._random_sample()
            y = objective_function(x)
            
            self.X_observed.append(x)
            self.y_observed.append(y)
            
            if y > self.best_y:
                self.best_y = y
                self.best_x = x
            
            print(f"初始采样 {i+1}: x={x}, y={y}")
        
        # 贝叶斯优化迭代
        for i in range(self.n_iterations - self.n_initial):
            # 拟合代理模型
            X = np.array(self.X_observed)
            y = np.array(self.y_observed)
            self.surrogate.fit(X, y)
            
            # 选择下一个点
            x_next = self._suggest_next()
            y_next = objective_function(x_next)
            
            # 更新观测
            self.X_observed.append(x_next)
            self.y_observed.append(y_next)
            
            if y_next > self.best_y:
                self.best_y = y_next
                self.best_x = x_next
            
            print(f"迭代 {i+1}: x={x_next}, y={y_next}, best_y={self.best_y}")
        
        return self.best_x, self.best_y

LLM超参数优化实践

1. 学习率优化

# 定义学习率参数空间
lr_param_bounds = [
    (1e-6, 1e-3)  # 学习率范围
]

# 目标函数
def learning_rate_objective(params):
    lr = params[0]
    
    # 训练模型
    model = train_model(learning_rate=lr)
    
    # 评估模型
    eval_loss = evaluate_model(model)
    
    # 返回负损失(因为贝叶斯优化默认最大化)
    return -eval_loss

# 执行贝叶斯优化
optimizer = BayesianOptimizer(
    param_bounds=lr_param_bounds,
    n_initial=10,
    n_iterations=50
)

best_lr, best_loss = optimizer.optimize(learning_rate_objective)
print(f"最佳学习率: {best_lr}")
print(f"最佳损失: {-best_loss}")

2. 多参数优化

# 定义多参数空间
param_bounds = [
    (1e-6, 1e-3),      # 学习率
    (8, 64),           # 批量大小
    (0.1, 0.5),        # Dropout率
    (1, 10),           # 预热步数
    (0.9, 0.999)       # Adam beta2
]

# 目标函数
def multi_param_objective(params):
    lr, batch_size, dropout, warmup_steps, beta2 = params
    
    # 训练模型
    model = train_model(
        learning_rate=lr,
        batch_size=int(batch_size),
        dropout=dropout,
        warmup_steps=int(warmup_steps),
        beta2=beta2
    )
    
    # 评估模型
    accuracy = evaluate_model(model)
    
    return accuracy

# 执行优化
optimizer = BayesianOptimizer(
    param_bounds=param_bounds,
    n_initial=15,
    n_iterations=60
)

best_params, best_accuracy = optimizer.optimize(multi_param_objective)

3. 提示优化

# 提示参数优化
prompt_param_bounds = [
    (0.1, 1.0),        # temperature
    (0.1, 1.0),        # top_p
    (100, 1000),       # max_tokens
    (0.0, 2.0),        # frequency_penalty
    (0.0, 2.0)         # presence_penalty
]

def prompt_objective(params):
    temp, top_p, max_tokens, freq_penalty, pres_penalty = params
    
    # 生成响应
    responses = generate_responses(
        test_prompts,
        temperature=temp,
        top_p=top_p,
        max_tokens=int(max_tokens),
        frequency_penalty=freq_penalty,
        presence_penalty=pres_penalty
    )
    
    # 评估质量
    quality_score = evaluate_response_quality(responses)
    
    return quality_score

# 执行优化
optimizer = BayesianOptimizer(
    param_bounds=prompt_param_bounds,
    n_initial=10,
    n_iterations=40
)

best_params, best_quality = optimizer.optimize(prompt_objective)

高级技巧

1. 多保真度优化

class MultiFidelityOptimizer:
    def __init__(self, param_bounds, fidelities=[100, 500, 1000]):
        self.param_bounds = param_bounds
        self.fidelities = fidelities
        self.results = {}
    
    def low_fidelity_evaluate(self, params, fidelity):
        """低保真度评估(快速但不准确)"""
        # 使用小数据集或少训练步数
        model = train_model_quick(params, steps=fidelity)
        return evaluate_model_quick(model)
    
    def high_fidelity_evaluate(self, params):
        """高保真度评估(慢但准确)"""
        model = train_model_full(params)
        return evaluate_model_full(model)
    
    def optimize(self):
        """多保真度优化"""
        # 阶段1:低保真度筛选
        print("阶段1:低保真度筛选")
        promising_params = []
        
        for i in range(20):
            params = self._random_sample()
            score = self.low_fidelity_evaluate(params, self.fidelities[0])
            promising_params.append((params, score))
        
        # 选择前10个参数
        promising_params.sort(key=lambda x: x[1], reverse=True)
        promising_params = promising_params[:10]
        
        # 阶段2:中保真度评估
        print("阶段2:中保真度评估")
        for params, _ in promising_params:
            score = self.low_fidelity_evaluate(params, self.fidelities[1])
            # 更新分数
        
        # 阶段3:高保真度评估
        print("阶段3:高保真度评估")
        best_params, best_score = None, -float('inf')
        for params, _ in promising_params:
            score = self.high_fidelity_evaluate(params)
            if score > best_score:
                best_score = score
                best_params = params
        
        return best_params, best_score

2. 并行贝叶斯优化

import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor

class ParallelBayesianOptimizer:
    def __init__(self, param_bounds, n_initial=5, n_iterations=25, n_workers=4):
        self.param_bounds = param_bounds
        self.n_initial = n_initial
        self.n_iterations = n_iterations
        self.n_workers = n_workers
        
        self.surrogate = GaussianProcessSurrogate()
        self.acquisition = AcquisitionFunction()
        
        self.X_observed = []
        self.y_observed = []
        self.best_y = -float('inf')
        self.best_x = None
    
    def _parallel_evaluate(self, params_list, objective_function):
        """并行评估多个参数组合"""
        with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
            futures = []
            for params in params_list:
                future = executor.submit(objective_function, params)
                futures.append(future)
            
            results = []
            for future in futures:
                results.append(future.result())
            
            return results
    
    def _suggest_batch(self, batch_size):
        """建议一批参数点"""
        # 使用批量获取函数
        batch = []
        for _ in range(batch_size):
            x_next = self._suggest_next()
            batch.append(x_next)
        return batch
    
    def optimize(self, objective_function):
        """执行并行优化"""
        # 初始并行采样
        initial_batch = [self._random_sample() for _ in range(self.n_initial)]
        initial_results = self._parallel_evaluate(initial_batch, objective_function)
        
        for params, result in zip(initial_batch, initial_results):
            self.X_observed.append(params)
            self.y_observed.append(result)
            
            if result > self.best_y:
                self.best_y = result
                self.best_x = params
        
        # 迭代优化
        for i in range(self.n_iterations // self.n_workers):
            # 拟合代理模型
            X = np.array(self.X_observed)
            y = np.array(self.y_observed)
            self.surrogate.fit(X, y)
            
            # 选择一批参数
            batch = self._suggest_batch(self.n_workers)
            
            # 并行评估
            batch_results = self._parallel_evaluate(batch, objective_function)
            
            # 更新观测
            for params, result in zip(batch, batch_results):
                self.X_observed.append(params)
                self.y_observed.append(result)
                
                if result > self.best_y:
                    self.best_y = result
                    self.best_x = params
            
            print(f"迭代 {i+1}: 批量完成, best_y={self.best_y}")
        
        return self.best_x, self.best_y

3. 约束优化

class ConstrainedBayesianOptimizer:
    def __init__(self, param_bounds, constraints):
        self.param_bounds = param_bounds
        self.constraints = constraints
        self.surrogate = GaussianProcessSurrogate()
        self.constraint_surrogates = []
        
        # 为每个约束创建代理模型
        for constraint in constraints:
            self.constraint_surrogates.append(GaussianProcessSurrogate())
    
    def _is_feasible(self, params):
        """检查参数是否满足约束"""
        for i, constraint in enumerate(self.constraints):
            # 使用约束代理模型预测
            mean, _ = self.constraint_surrogates[i].predict(
                params.reshape(1, -1)
            )
            if mean > constraint["threshold"]:
                return False
        return True
    
    def _constrained_acquisition(self, X):
        """约束获取函数"""
        # 期望改进
        ei = self.acquisition.expected_improvement(X, self.surrogate, self.best_y)
        
        # 约束满足概率
        constraint_probability = 1.0
        for i, surrogate in enumerate(self.constraint_surrogates):
            mean, std = surrogate.predict(X)
            # 计算约束满足概率
            from scipy.stats import norm
            prob = norm.cdf(-mean / std)
            constraint_probability *= prob
        
        return ei * constraint_probability
    
    def optimize(self, objective_function, constraint_functions):
        """执行约束优化"""
        # 初始采样
        for _ in range(self.n_initial):
            params = self._random_sample()
            
            # 评估目标函数
            y = objective_function(params)
            
            # 评估约束
            constraint_violations = []
            for constraint_fn in constraint_functions:
                violation = constraint_fn(params)
                constraint_violations.append(violation)
            
            # 记录结果
            self.X_observed.append(params)
            self.y_observed.append(y)
            
            # 记录约束违反
            for i, violation in enumerate(constraint_violations):
                self.constraint_surrogates[i].X_observed.append(params)
                self.constraint_surrogates[i].y_observed.append(violation)
            
            if y > self.best_y and self._is_feasible(params):
                self.best_y = y
                self.best_x = params
        
        # 迭代优化
        for i in range(self.n_iterations - self.n_initial):
            # 拟合所有代理模型
            X = np.array(self.X_observed)
            y = np.array(self.y_observed)
            self.surrogate.fit(X, y)
            
            for surrogate in self.constraint_surrogates:
                X_c = np.array(surrogate.X_observed)
                y_c = np.array(surrogate.y_observed)
                surrogate.fit(X_c, y_c)
            
            # 选择下一个点
            x_next = self._suggest_next_constrained()
            y_next = objective_function(x_next)
            
            # 评估约束
            constraint_violations = []
            for constraint_fn in constraint_functions:
                violation = constraint_fn(x_next)
                constraint_violations.append(violation)
            
            # 更新观测
            self.X_observed.append(x_next)
            self.y_observed.append(y_next)
            
            for i, violation in enumerate(constraint_violations):
                self.constraint_surrogates[i].X_observed.append(x_next)
                self.constraint_surrogates[i].y_observed.append(violation)
            
            if y_next > self.best_y and self._is_feasible(x_next):
                self.best_y = y_next
                self.best_x = x_next
            
            print(f"迭代 {i+1}: y={y_next}, feasible={self._is_feasible(x_next)}")
        
        return self.best_x, self.best_y

实际应用案例

案例:LLM训练超参数优化

# 定义参数空间
param_bounds = [
    (1e-6, 1e-3),      # 学习率
    (8, 64),           # 批量大小
    (0.1, 0.5),        # Dropout率
    (1, 10),           # 预热步数
    (0.9, 0.999),      # Adam beta2
    (0.0, 0.1)         # 权重衰减
]

# 定义约束
constraints = [
    {
        "name": "memory_usage",
        "threshold": 8e9,  # 8GB内存限制
        "function": lambda p: calculate_memory_usage(p[1])
    },
    {
        "name": "training_time",
        "threshold": 3600,  # 1小时训练时间限制
        "function": lambda p: estimate_training_time(p)
    }
]

# 目标函数
def objective(params):
    lr, batch_size, dropout, warmup_steps, beta2, weight_decay = params
    
    model = train_model(
        learning_rate=lr,
        batch_size=int(batch_size),
        dropout=dropout,
        warmup_steps=int(warmup_steps),
        beta2=beta2,
        weight_decay=weight_decay
    )
    
    accuracy = evaluate_model(model)
    return accuracy

# 约束函数
def memory_constraint(params):
    batch_size = params[1]
    return batch_size * 1e9  # 简化的内存计算

def time_constraint(params):
    return 1000 / params[1]  # 简化的时间估计

# 执行约束优化
optimizer = ConstrainedBayesianOptimizer(
    param_bounds=param_bounds,
    constraints=constraints
)

best_params, best_accuracy = optimizer.optimize(
    objective_function=objective,
    constraint_functions=[memory_constraint, time_constraint]
)

print(f"最佳参数: {best_params}")
print(f"最佳准确率: {best_accuracy}")

总结

贝叶斯优化为LLM超参数调优提供了强大的工具:

  1. 高效搜索 - 智能选择下一个评估点,减少评估次数
  2. 处理黑盒函数 - 适用于评估成本高的函数
  3. 全局优化 - 避免陷入局部最优
  4. 约束处理 - 可以处理带约束的优化问题
  5. 不确定性量化 - 提供优化过程的不确定性估计

通过贝叶斯优化,可以显著减少LLM超参数调优所需的时间和计算资源,同时获得更好的模型性能。