贝叶斯优化在LLM超参数调优中的应用
--- title: "贝叶斯优化在LLM超参数调优中的应用" description: "介绍贝叶斯优化算法在大型语言模型超参数调优中的原理、实现和最佳实践。" tags: ["贝叶斯优化", "超参数调优", "llm", "机器学习", "优化算法"] category: "llm" icon: "🧠"
贝叶斯优化在LLM超参数调优中的应用
什么是贝叶斯优化?
贝叶斯优化是一种基于贝叶斯定理的全局优化算法,特别适用于:
- 评估成本高的黑盒函数优化
- 超参数调优
- 实验设计
其核心思想是建立目标函数的概率模型,然后根据模型选择下一个最有希望的参数组合进行评估。
贝叶斯优化原理
1. 代理模型(Surrogate Model)
class GaussianProcessSurrogate:
def __init__(self):
self.gp = None
self.X_observed = []
self.y_observed = []
def fit(self, X, y):
"""拟合高斯过程模型"""
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel
# 定义核函数
kernel = ConstantKernel(1.0) * RBF(length_scale=1.0)
self.gp = GaussianProcessRegressor(
kernel=kernel,
n_restarts_optimizer=10,
alpha=1e-6
)
self.X_observed = X
self.y_observed = y
self.gp.fit(X, y)
def predict(self, X):
"""预测均值和标准差"""
if self.gp is None:
return np.zeros(len(X)), np.ones(len(X))
mean, std = self.gp.predict(X, return_std=True)
return mean, std
2. 获取函数(Acquisition Function)
class AcquisitionFunction:
@staticmethod
def expected_improvement(X, surrogate, best_y, xi=0.01):
"""期望改进(EI)"""
from scipy.stats import norm
mean, std = surrogate.predict(X)
# 避免除零
std = np.maximum(std, 1e-9)
# 计算期望改进
z = (mean - best_y - xi) / std
ei = std * (z * norm.cdf(z) + norm.pdf(z))
return ei
@staticmethod
def probability_of_improvement(X, surrogate, best_y, xi=0.01):
"""改进概率(PI)"""
from scipy.stats import norm
mean, std = surrogate.predict(X)
# 避免除零
std = np.maximum(std, 1e-9)
z = (mean - best_y - xi) / std
pi = norm.cdf(z)
return pi
@staticmethod
def upper_confidence_bound(X, surrogate, kappa=2.576):
"""上置信界(UCB)"""
mean, std = surrogate.predict(X)
return mean + kappa * std
3. 优化器
class BayesianOptimizer:
def __init__(self, param_bounds, n_initial=5, n_iterations=25):
self.param_bounds = param_bounds
self.n_initial = n_initial
self.n_iterations = n_iterations
self.surrogate = GaussianProcessSurrogate()
self.acquisition = AcquisitionFunction()
self.X_observed = []
self.y_observed = []
self.best_y = -float('inf')
self.best_x = None
def _random_sample(self):
"""随机采样参数"""
params = []
for bounds in self.param_bounds:
if isinstance(bounds, list):
params.append(np.random.choice(bounds))
else:
params.append(np.random.uniform(bounds[0], bounds[1]))
return np.array(params)
def _suggest_next(self):
"""建议下一个参数点"""
# 使用获取函数选择下一个点
best_acquisition = -float('inf')
best_x = None
# 多次随机采样选择最佳
for _ in range(1000):
x_candidate = self._random_sample()
x_candidate = x_candidate.reshape(1, -1)
acquisition_value = self.acquisition.expected_improvement(
x_candidate, self.surrogate, self.best_y
)
if acquisition_value > best_acquisition:
best_acquisition = acquisition_value
best_x = x_candidate.flatten()
return best_x
def optimize(self, objective_function):
"""执行优化"""
# 初始采样
for i in range(self.n_initial):
x = self._random_sample()
y = objective_function(x)
self.X_observed.append(x)
self.y_observed.append(y)
if y > self.best_y:
self.best_y = y
self.best_x = x
print(f"初始采样 {i+1}: x={x}, y={y}")
# 贝叶斯优化迭代
for i in range(self.n_iterations - self.n_initial):
# 拟合代理模型
X = np.array(self.X_observed)
y = np.array(self.y_observed)
self.surrogate.fit(X, y)
# 选择下一个点
x_next = self._suggest_next()
y_next = objective_function(x_next)
# 更新观测
self.X_observed.append(x_next)
self.y_observed.append(y_next)
if y_next > self.best_y:
self.best_y = y_next
self.best_x = x_next
print(f"迭代 {i+1}: x={x_next}, y={y_next}, best_y={self.best_y}")
return self.best_x, self.best_y
LLM超参数优化实践
1. 学习率优化
# 定义学习率参数空间
lr_param_bounds = [
(1e-6, 1e-3) # 学习率范围
]
# 目标函数
def learning_rate_objective(params):
lr = params[0]
# 训练模型
model = train_model(learning_rate=lr)
# 评估模型
eval_loss = evaluate_model(model)
# 返回负损失(因为贝叶斯优化默认最大化)
return -eval_loss
# 执行贝叶斯优化
optimizer = BayesianOptimizer(
param_bounds=lr_param_bounds,
n_initial=10,
n_iterations=50
)
best_lr, best_loss = optimizer.optimize(learning_rate_objective)
print(f"最佳学习率: {best_lr}")
print(f"最佳损失: {-best_loss}")
2. 多参数优化
# 定义多参数空间
param_bounds = [
(1e-6, 1e-3), # 学习率
(8, 64), # 批量大小
(0.1, 0.5), # Dropout率
(1, 10), # 预热步数
(0.9, 0.999) # Adam beta2
]
# 目标函数
def multi_param_objective(params):
lr, batch_size, dropout, warmup_steps, beta2 = params
# 训练模型
model = train_model(
learning_rate=lr,
batch_size=int(batch_size),
dropout=dropout,
warmup_steps=int(warmup_steps),
beta2=beta2
)
# 评估模型
accuracy = evaluate_model(model)
return accuracy
# 执行优化
optimizer = BayesianOptimizer(
param_bounds=param_bounds,
n_initial=15,
n_iterations=60
)
best_params, best_accuracy = optimizer.optimize(multi_param_objective)
3. 提示优化
# 提示参数优化
prompt_param_bounds = [
(0.1, 1.0), # temperature
(0.1, 1.0), # top_p
(100, 1000), # max_tokens
(0.0, 2.0), # frequency_penalty
(0.0, 2.0) # presence_penalty
]
def prompt_objective(params):
temp, top_p, max_tokens, freq_penalty, pres_penalty = params
# 生成响应
responses = generate_responses(
test_prompts,
temperature=temp,
top_p=top_p,
max_tokens=int(max_tokens),
frequency_penalty=freq_penalty,
presence_penalty=pres_penalty
)
# 评估质量
quality_score = evaluate_response_quality(responses)
return quality_score
# 执行优化
optimizer = BayesianOptimizer(
param_bounds=prompt_param_bounds,
n_initial=10,
n_iterations=40
)
best_params, best_quality = optimizer.optimize(prompt_objective)
高级技巧
1. 多保真度优化
class MultiFidelityOptimizer:
def __init__(self, param_bounds, fidelities=[100, 500, 1000]):
self.param_bounds = param_bounds
self.fidelities = fidelities
self.results = {}
def low_fidelity_evaluate(self, params, fidelity):
"""低保真度评估(快速但不准确)"""
# 使用小数据集或少训练步数
model = train_model_quick(params, steps=fidelity)
return evaluate_model_quick(model)
def high_fidelity_evaluate(self, params):
"""高保真度评估(慢但准确)"""
model = train_model_full(params)
return evaluate_model_full(model)
def optimize(self):
"""多保真度优化"""
# 阶段1:低保真度筛选
print("阶段1:低保真度筛选")
promising_params = []
for i in range(20):
params = self._random_sample()
score = self.low_fidelity_evaluate(params, self.fidelities[0])
promising_params.append((params, score))
# 选择前10个参数
promising_params.sort(key=lambda x: x[1], reverse=True)
promising_params = promising_params[:10]
# 阶段2:中保真度评估
print("阶段2:中保真度评估")
for params, _ in promising_params:
score = self.low_fidelity_evaluate(params, self.fidelities[1])
# 更新分数
# 阶段3:高保真度评估
print("阶段3:高保真度评估")
best_params, best_score = None, -float('inf')
for params, _ in promising_params:
score = self.high_fidelity_evaluate(params)
if score > best_score:
best_score = score
best_params = params
return best_params, best_score
2. 并行贝叶斯优化
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor
class ParallelBayesianOptimizer:
def __init__(self, param_bounds, n_initial=5, n_iterations=25, n_workers=4):
self.param_bounds = param_bounds
self.n_initial = n_initial
self.n_iterations = n_iterations
self.n_workers = n_workers
self.surrogate = GaussianProcessSurrogate()
self.acquisition = AcquisitionFunction()
self.X_observed = []
self.y_observed = []
self.best_y = -float('inf')
self.best_x = None
def _parallel_evaluate(self, params_list, objective_function):
"""并行评估多个参数组合"""
with ThreadPoolExecutor(max_workers=self.n_workers) as executor:
futures = []
for params in params_list:
future = executor.submit(objective_function, params)
futures.append(future)
results = []
for future in futures:
results.append(future.result())
return results
def _suggest_batch(self, batch_size):
"""建议一批参数点"""
# 使用批量获取函数
batch = []
for _ in range(batch_size):
x_next = self._suggest_next()
batch.append(x_next)
return batch
def optimize(self, objective_function):
"""执行并行优化"""
# 初始并行采样
initial_batch = [self._random_sample() for _ in range(self.n_initial)]
initial_results = self._parallel_evaluate(initial_batch, objective_function)
for params, result in zip(initial_batch, initial_results):
self.X_observed.append(params)
self.y_observed.append(result)
if result > self.best_y:
self.best_y = result
self.best_x = params
# 迭代优化
for i in range(self.n_iterations // self.n_workers):
# 拟合代理模型
X = np.array(self.X_observed)
y = np.array(self.y_observed)
self.surrogate.fit(X, y)
# 选择一批参数
batch = self._suggest_batch(self.n_workers)
# 并行评估
batch_results = self._parallel_evaluate(batch, objective_function)
# 更新观测
for params, result in zip(batch, batch_results):
self.X_observed.append(params)
self.y_observed.append(result)
if result > self.best_y:
self.best_y = result
self.best_x = params
print(f"迭代 {i+1}: 批量完成, best_y={self.best_y}")
return self.best_x, self.best_y
3. 约束优化
class ConstrainedBayesianOptimizer:
def __init__(self, param_bounds, constraints):
self.param_bounds = param_bounds
self.constraints = constraints
self.surrogate = GaussianProcessSurrogate()
self.constraint_surrogates = []
# 为每个约束创建代理模型
for constraint in constraints:
self.constraint_surrogates.append(GaussianProcessSurrogate())
def _is_feasible(self, params):
"""检查参数是否满足约束"""
for i, constraint in enumerate(self.constraints):
# 使用约束代理模型预测
mean, _ = self.constraint_surrogates[i].predict(
params.reshape(1, -1)
)
if mean > constraint["threshold"]:
return False
return True
def _constrained_acquisition(self, X):
"""约束获取函数"""
# 期望改进
ei = self.acquisition.expected_improvement(X, self.surrogate, self.best_y)
# 约束满足概率
constraint_probability = 1.0
for i, surrogate in enumerate(self.constraint_surrogates):
mean, std = surrogate.predict(X)
# 计算约束满足概率
from scipy.stats import norm
prob = norm.cdf(-mean / std)
constraint_probability *= prob
return ei * constraint_probability
def optimize(self, objective_function, constraint_functions):
"""执行约束优化"""
# 初始采样
for _ in range(self.n_initial):
params = self._random_sample()
# 评估目标函数
y = objective_function(params)
# 评估约束
constraint_violations = []
for constraint_fn in constraint_functions:
violation = constraint_fn(params)
constraint_violations.append(violation)
# 记录结果
self.X_observed.append(params)
self.y_observed.append(y)
# 记录约束违反
for i, violation in enumerate(constraint_violations):
self.constraint_surrogates[i].X_observed.append(params)
self.constraint_surrogates[i].y_observed.append(violation)
if y > self.best_y and self._is_feasible(params):
self.best_y = y
self.best_x = params
# 迭代优化
for i in range(self.n_iterations - self.n_initial):
# 拟合所有代理模型
X = np.array(self.X_observed)
y = np.array(self.y_observed)
self.surrogate.fit(X, y)
for surrogate in self.constraint_surrogates:
X_c = np.array(surrogate.X_observed)
y_c = np.array(surrogate.y_observed)
surrogate.fit(X_c, y_c)
# 选择下一个点
x_next = self._suggest_next_constrained()
y_next = objective_function(x_next)
# 评估约束
constraint_violations = []
for constraint_fn in constraint_functions:
violation = constraint_fn(x_next)
constraint_violations.append(violation)
# 更新观测
self.X_observed.append(x_next)
self.y_observed.append(y_next)
for i, violation in enumerate(constraint_violations):
self.constraint_surrogates[i].X_observed.append(x_next)
self.constraint_surrogates[i].y_observed.append(violation)
if y_next > self.best_y and self._is_feasible(x_next):
self.best_y = y_next
self.best_x = x_next
print(f"迭代 {i+1}: y={y_next}, feasible={self._is_feasible(x_next)}")
return self.best_x, self.best_y
实际应用案例
案例:LLM训练超参数优化
# 定义参数空间
param_bounds = [
(1e-6, 1e-3), # 学习率
(8, 64), # 批量大小
(0.1, 0.5), # Dropout率
(1, 10), # 预热步数
(0.9, 0.999), # Adam beta2
(0.0, 0.1) # 权重衰减
]
# 定义约束
constraints = [
{
"name": "memory_usage",
"threshold": 8e9, # 8GB内存限制
"function": lambda p: calculate_memory_usage(p[1])
},
{
"name": "training_time",
"threshold": 3600, # 1小时训练时间限制
"function": lambda p: estimate_training_time(p)
}
]
# 目标函数
def objective(params):
lr, batch_size, dropout, warmup_steps, beta2, weight_decay = params
model = train_model(
learning_rate=lr,
batch_size=int(batch_size),
dropout=dropout,
warmup_steps=int(warmup_steps),
beta2=beta2,
weight_decay=weight_decay
)
accuracy = evaluate_model(model)
return accuracy
# 约束函数
def memory_constraint(params):
batch_size = params[1]
return batch_size * 1e9 # 简化的内存计算
def time_constraint(params):
return 1000 / params[1] # 简化的时间估计
# 执行约束优化
optimizer = ConstrainedBayesianOptimizer(
param_bounds=param_bounds,
constraints=constraints
)
best_params, best_accuracy = optimizer.optimize(
objective_function=objective,
constraint_functions=[memory_constraint, time_constraint]
)
print(f"最佳参数: {best_params}")
print(f"最佳准确率: {best_accuracy}")
总结
贝叶斯优化为LLM超参数调优提供了强大的工具:
- 高效搜索 - 智能选择下一个评估点,减少评估次数
- 处理黑盒函数 - 适用于评估成本高的函数
- 全局优化 - 避免陷入局部最优
- 约束处理 - 可以处理带约束的优化问题
- 不确定性量化 - 提供优化过程的不确定性估计
通过贝叶斯优化,可以显著减少LLM超参数调优所需的时间和计算资源,同时获得更好的模型性能。