LLM超参数调优指南
--- title: "LLM超参数调优指南" description: "介绍大型语言模型超参数调优的策略、工具和最佳实践。" tags: ["超参数调优", "llm", "优化", "机器学习", "超参数搜索"] category: "llm" icon: "🧠"
LLM超参数调优指南
为什么超参数调优很重要?
超参数是控制模型训练过程的配置参数,对模型性能有显著影响:
- 性能影响 - 合适的超参数可以显著提升模型性能
- 训练效率 - 优化的超参数可以减少训练时间和计算成本
- 泛化能力 - 良好的超参数设置可以提高模型的泛化能力
- 稳定性 - 合适的超参数可以确保训练过程稳定
LLM关键超参数
1. 学习率相关
# 学习率配置
learning_rate_config = {
"initial_lr": 2e-5, # 初始学习率
"min_lr": 1e-7, # 最小学习率
"warmup_steps": 1000, # 预热步数
"scheduler": "cosine", # 调度器类型
"decay_rate": 0.95 # 衰减率
}
# 学习率调度示例
def get_lr_scheduler(optimizer, warmup_steps, total_steps):
import torch
from torch.optim.lr_scheduler import LambdaLR
def lr_lambda(current_step):
if current_step < warmup_steps:
return float(current_step) / float(max(1, warmup_steps))
return max(
0.0,
float(total_steps - current_step) / float(max(1, total_steps - warmup_steps))
)
return LambdaLR(optimizer, lr_lambda)
2. 优化器参数
# 优化器配置
optimizer_config = {
"optimizer": "adamw",
"beta1": 0.9,
"beta2": 0.999,
"eps": 1e-8,
"weight_decay": 0.01,
"max_grad_norm": 1.0
}
# 优化器创建
def create_optimizer(model, config):
import torch.optim as optim
if config["optimizer"] == "adamw":
return optim.AdamW(
model.parameters(),
lr=config["initial_lr"],
betas=(config["beta1"], config["beta2"]),
eps=config["eps"],
weight_decay=config["weight_decay"]
)
elif config["optimizer"] == "sgd":
return optim.SGD(
model.parameters(),
lr=config["initial_lr"],
momentum=0.9,
weight_decay=config["weight_decay"]
)
3. 训练超参数
# 训练配置
training_config = {
"batch_size": 16,
"micro_batch_size": 4,
"gradient_accumulation_steps": 4,
"max_seq_length": 2048,
"epochs": 3,
"max_steps": 10000,
"eval_steps": 500,
"save_steps": 1000
}
# 批量大小计算
def calculate_effective_batch_size(config):
return (
config["batch_size"] *
config["gradient_accumulation_steps"]
)
4. 模型架构参数
# 模型配置
model_config = {
"hidden_size": 4096,
"num_layers": 32,
"num_heads": 32,
"intermediate_size": 11008,
"dropout": 0.1,
"attention_dropout": 0.1,
"layer_norm_eps": 1e-5
}
超参数搜索策略
1. 网格搜索
class GridSearch:
def __init__(self, param_grid):
self.param_grid = param_grid
self.results = []
def generate_combinations(self):
"""生成所有参数组合"""
import itertools
keys = self.param_grid.keys()
values = self.param_grid.values()
combinations = []
for combination in itertools.product(*values):
combinations.append(dict(zip(keys, combination)))
return combinations
def run(self, objective_function):
"""运行网格搜索"""
combinations = self.generate_combinations()
for i, params in enumerate(combinations):
print(f"组合 {i+1}/{len(combinations)}: {params}")
# 运行目标函数
result = objective_function(params)
self.results.append({
"params": params,
"result": result
})
# 找到最佳参数
best_result = max(self.results, key=lambda x: x["result"])
return best_result["params"], best_result["result"]
# 使用示例
param_grid = {
"learning_rate": [1e-5, 2e-5, 5e-5, 1e-4],
"batch_size": [8, 16, 32],
"dropout": [0.1, 0.2, 0.3]
}
grid_search = GridSearch(param_grid)
best_params, best_result = grid_search.run(train_and_evaluate)
2. 随机搜索
import random
class RandomSearch:
def __init__(self, param_distributions, n_iter=10):
self.param_distributions = param_distributions
self.n_iter = n_iter
self.results = []
def sample_params(self):
"""从分布中采样参数"""
params = {}
for param_name, distribution in self.param_distributions.items():
if distribution["type"] == "uniform":
params[param_name] = random.uniform(
distribution["low"],
distribution["high"]
)
elif distribution["type"] == "log_uniform":
params[param_name] = random.uniform(
distribution["low"],
distribution["high"]
)
params[param_name] = 10 ** params[param_name]
elif distribution["type"] == "choice":
params[param_name] = random.choice(distribution["values"])
return params
def run(self, objective_function):
"""运行随机搜索"""
for i in range(self.n_iter):
params = self.sample_params()
print(f"迭代 {i+1}/{self.n_iter}: {params}")
result = objective_function(params)
self.results.append({
"params": params,
"result": result
})
best_result = max(self.results, key=lambda x: x["result"])
return best_result["params"], best_result["result"]
# 使用示例
param_distributions = {
"learning_rate": {"type": "log_uniform", "low": -6, "high": -3},
"batch_size": {"type": "choice", "values": [8, 16, 32, 64]},
"dropout": {"type": "uniform", "low": 0.1, "high": 0.5}
}
random_search = RandomSearch(param_distributions, n_iter=20)
best_params, best_result = random_search.run(train_and_evaluate)
3. 贝叶斯优化
class BayesianOptimization:
def __init__(self, param_bounds, n_initial=5, n_iterations=20):
self.param_bounds = param_bounds
self.n_initial = n_initial
self.n_iterations = n_iterations
self.results = []
def surrogate_model(self, X):
"""高斯过程代理模型"""
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF
kernel = RBF(1.0)
gp = GaussianProcessRegressor(kernel=kernel)
if len(self.results) > 0:
X_train = [list(r["params"].values()) for r in self.results]
y_train = [r["result"] for r in self.results]
gp.fit(X_train, y_train)
return gp
def acquisition_function(self, X):
"""获取函数(期望改进)"""
gp = self.surrogate_model(X)
if len(self.results) == 0:
return 0
# 简化的期望改进计算
best_result = max(r["result"] for r in self.results)
mean, std = gp.predict(X.reshape(1, -1), return_std=True)
from scipy.stats import norm
z = (mean - best_result) / std
ei = std * (z * norm.cdf(z) + norm.pdf(z))
return ei
def suggest_next_params(self):
"""建议下一个参数组合"""
if len(self.results) < self.n_initial:
# 初始阶段:随机采样
return self._random_sample()
# 使用获取函数选择下一个点
best_params = None
best_ei = -float('inf')
for _ in range(1000): # 多次采样
params = self._random_sample()
X = list(params.values())
ei = self.acquisition_function(X)
if ei > best_ei:
best_ei = ei
best_params = params
return best_params
def _random_sample(self):
"""随机采样参数"""
params = {}
for param_name, bounds in self.param_bounds.items():
if isinstance(bounds, list):
params[param_name] = random.choice(bounds)
else:
params[param_name] = random.uniform(bounds[0], bounds[1])
return params
def run(self, objective_function):
"""运行贝叶斯优化"""
# 初始采样
for i in range(self.n_initial):
params = self._random_sample()
result = objective_function(params)
self.results.append({"params": params, "result": result})
# 贝叶斯优化
for i in range(self.n_iterations - self.n_initial):
params = self.suggest_next_params()
result = objective_function(params)
self.results.append({"params": params, "result": result})
print(f"迭代 {i+1}: 参数={params}, 结果={result}")
best_result = max(self.results, key=lambda x: x["result"])
return best_result["params"], best_result["result"]
4. 遗传算法
class GeneticAlgorithm:
def __init__(self, param_bounds, population_size=20, generations=10):
self.param_bounds = param_bounds
self.population_size = population_size
self.generations = generations
self.population = []
self.fitness_scores = []
def initialize_population(self):
"""初始化种群"""
self.population = []
for _ in range(self.population_size):
individual = self._random_individual()
self.population.append(individual)
def _random_individual(self):
"""生成随机个体"""
individual = {}
for param_name, bounds in self.param_bounds.items():
if isinstance(bounds, list):
individual[param_name] = random.choice(bounds)
else:
individual[param_name] = random.uniform(bounds[0], bounds[1])
return individual
def evaluate_population(self, objective_function):
"""评估种群适应度"""
self.fitness_scores = []
for individual in self.population:
fitness = objective_function(individual)
self.fitness_scores.append(fitness)
def select_parents(self):
"""选择父代"""
# 锦标赛选择
parents = []
for _ in range(self.population_size):
tournament = random.sample(range(self.population_size), 3)
best_idx = max(tournament, key=lambda i: self.fitness_scores[i])
parents.append(self.population[best_idx])
return parents
def crossover(self, parent1, parent2):
"""交叉操作"""
child = {}
for param_name in self.param_bounds.keys():
if random.random() < 0.5:
child[param_name] = parent1[param_name]
else:
child[param_name] = parent2[param_name]
return child
def mutate(self, individual, mutation_rate=0.1):
"""变异操作"""
mutated = individual.copy()
for param_name, bounds in self.param_bounds.items():
if random.random() < mutation_rate:
if isinstance(bounds, list):
mutated[param_name] = random.choice(bounds)
else:
# 高斯变异
delta = random.gauss(0, 0.1)
mutated[param_name] = max(bounds[0], min(bounds[1],
individual[param_name] + delta))
return mutated
def evolve(self, objective_function):
"""执行进化过程"""
self.initialize_population()
for generation in range(self.generations):
print(f"世代 {generation+1}/{self.generations}")
# 评估适应度
self.evaluate_population(objective_function)
# 选择父代
parents = self.select_parents()
# 生成新一代
new_population = []
for i in range(0, self.population_size, 2):
child1 = self.crossover(parents[i], parents[i+1])
child2 = self.crossover(parents[i+1], parents[i])
child1 = self.mutate(child1)
child2 = self.mutate(child2)
new_population.extend([child1, child2])
self.population = new_population
# 返回最佳个体
best_idx = max(range(self.population_size),
key=lambda i: self.fitness_scores[i])
return self.population[best_idx], self.fitness_scores[best_idx]
实际应用案例
案例:LLM微调超参数优化
# 定义参数空间
param_space = {
"learning_rate": (1e-6, 1e-3),
"batch_size": [8, 16, 32, 64],
"num_epochs": [2, 3, 5],
"warmup_ratio": (0.0, 0.2),
"weight_decay": (0.0, 0.1),
"gradient_accumulation_steps": [1, 2, 4, 8]
}
# 目标函数
def objective_function(params):
# 训练模型
model = train_model_with_params(params)
# 评估模型
eval_results = evaluate_model(model)
# 返回优化目标(例如:验证准确率)
return eval_results["accuracy"]
# 使用贝叶斯优化
bo = BayesianOptimization(param_space, n_initial=10, n_iterations=50)
best_params, best_accuracy = bo.run(objective_function)
print(f"最佳参数: {best_params}")
print(f"最佳准确率: {best_accuracy}")
案例:提示优化
# 提示超参数
prompt_params = {
"temperature": (0.1, 1.0),
"top_p": (0.1, 1.0),
"max_tokens": (100, 1000),
"frequency_penalty": (0.0, 2.0),
"presence_penalty": (0.0, 2.0)
}
# 目标函数
def prompt_objective(params):
# 使用参数生成提示
responses = generate_responses_with_params(test_prompts, params)
# 评估响应质量
quality_scores = evaluate_response_quality(responses)
return sum(quality_scores) / len(quality_scores)
# 随机搜索
rs = RandomSearch(prompt_params, n_iter=30)
best_params, best_score = rs.run(prompt_objective)
最佳实践
1. 渐进式调优
# 阶段1:粗粒度搜索
coarse_grid = {
"learning_rate": [1e-5, 1e-4, 1e-3],
"batch_size": [8, 16, 32]
}
# 阶段2:细粒度搜索
fine_grid = {
"learning_rate": [5e-5, 1e-4, 2e-4, 5e-4],
"batch_size": [12, 16, 20, 24]
}
# 阶段3:局部优化
local_optimization = {
"learning_rate": (1e-4, 3e-4),
"batch_size": [14, 16, 18]
}
2. 并行化
import multiprocessing as mp
def parallel_search(param_combinations, objective_function):
"""并行超参数搜索"""
with mp.Pool(processes=mp.cpu_count()) as pool:
results = pool.map(objective_function, param_combinations)
return results
# 使用异步执行
import asyncio
async def async_search(param_combinations, objective_function):
"""异步超参数搜索"""
tasks = []
for params in param_combinations:
task = asyncio.create_task(
asyncio.to_thread(objective_function, params)
)
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
3. 早停策略
class EarlyStopping:
def __init__(self, patience=5, min_delta=0.001):
self.patience = patience
self.min_delta = min_delta
self.counter = 0
self.best_score = None
def __call__(self, score):
if self.best_score is None:
self.best_score = score
elif score < self.best_score + self.min_delta:
self.counter += 1
if self.counter >= self.patience:
return True
else:
self.best_score = score
self.counter = 0
return False
总结
超参数调优是LLM开发的关键环节:
- 理解超参数 - 了解每个超参数的作用和影响
- 选择搜索策略 - 根据计算资源和时间选择合适的策略
- 渐进式优化 - 从粗粒度到细粒度逐步优化
- 并行化 - 利用并行计算加速搜索过程
- 记录和分析 - 详细记录每次实验的结果
通过系统化的超参数调优,可以显著提升LLM的性能和训练效率。