← 返回首页
🧠

LLM超参数调优指南

📂 llm ⏱ 6 min 1117 words

--- title: "LLM超参数调优指南" description: "介绍大型语言模型超参数调优的策略、工具和最佳实践。" tags: ["超参数调优", "llm", "优化", "机器学习", "超参数搜索"] category: "llm" icon: "🧠"

LLM超参数调优指南

为什么超参数调优很重要?

超参数是控制模型训练过程的配置参数,对模型性能有显著影响:

  1. 性能影响 - 合适的超参数可以显著提升模型性能
  2. 训练效率 - 优化的超参数可以减少训练时间和计算成本
  3. 泛化能力 - 良好的超参数设置可以提高模型的泛化能力
  4. 稳定性 - 合适的超参数可以确保训练过程稳定

LLM关键超参数

1. 学习率相关

# 学习率配置
learning_rate_config = {
    "initial_lr": 2e-5,  # 初始学习率
    "min_lr": 1e-7,      # 最小学习率
    "warmup_steps": 1000,  # 预热步数
    "scheduler": "cosine",  # 调度器类型
    "decay_rate": 0.95     # 衰减率
}

# 学习率调度示例
def get_lr_scheduler(optimizer, warmup_steps, total_steps):
    import torch
    from torch.optim.lr_scheduler import LambdaLR
    
    def lr_lambda(current_step):
        if current_step < warmup_steps:
            return float(current_step) / float(max(1, warmup_steps))
        return max(
            0.0,
            float(total_steps - current_step) / float(max(1, total_steps - warmup_steps))
        )
    
    return LambdaLR(optimizer, lr_lambda)

2. 优化器参数

# 优化器配置
optimizer_config = {
    "optimizer": "adamw",
    "beta1": 0.9,
    "beta2": 0.999,
    "eps": 1e-8,
    "weight_decay": 0.01,
    "max_grad_norm": 1.0
}

# 优化器创建
def create_optimizer(model, config):
    import torch.optim as optim
    
    if config["optimizer"] == "adamw":
        return optim.AdamW(
            model.parameters(),
            lr=config["initial_lr"],
            betas=(config["beta1"], config["beta2"]),
            eps=config["eps"],
            weight_decay=config["weight_decay"]
        )
    elif config["optimizer"] == "sgd":
        return optim.SGD(
            model.parameters(),
            lr=config["initial_lr"],
            momentum=0.9,
            weight_decay=config["weight_decay"]
        )

3. 训练超参数

# 训练配置
training_config = {
    "batch_size": 16,
    "micro_batch_size": 4,
    "gradient_accumulation_steps": 4,
    "max_seq_length": 2048,
    "epochs": 3,
    "max_steps": 10000,
    "eval_steps": 500,
    "save_steps": 1000
}

# 批量大小计算
def calculate_effective_batch_size(config):
    return (
        config["batch_size"] * 
        config["gradient_accumulation_steps"]
    )

4. 模型架构参数

# 模型配置
model_config = {
    "hidden_size": 4096,
    "num_layers": 32,
    "num_heads": 32,
    "intermediate_size": 11008,
    "dropout": 0.1,
    "attention_dropout": 0.1,
    "layer_norm_eps": 1e-5
}

超参数搜索策略

1. 网格搜索

class GridSearch:
    def __init__(self, param_grid):
        self.param_grid = param_grid
        self.results = []
    
    def generate_combinations(self):
        """生成所有参数组合"""
        import itertools
        
        keys = self.param_grid.keys()
        values = self.param_grid.values()
        
        combinations = []
        for combination in itertools.product(*values):
            combinations.append(dict(zip(keys, combination)))
        
        return combinations
    
    def run(self, objective_function):
        """运行网格搜索"""
        combinations = self.generate_combinations()
        
        for i, params in enumerate(combinations):
            print(f"组合 {i+1}/{len(combinations)}: {params}")
            
            # 运行目标函数
            result = objective_function(params)
            self.results.append({
                "params": params,
                "result": result
            })
        
        # 找到最佳参数
        best_result = max(self.results, key=lambda x: x["result"])
        return best_result["params"], best_result["result"]

# 使用示例
param_grid = {
    "learning_rate": [1e-5, 2e-5, 5e-5, 1e-4],
    "batch_size": [8, 16, 32],
    "dropout": [0.1, 0.2, 0.3]
}

grid_search = GridSearch(param_grid)
best_params, best_result = grid_search.run(train_and_evaluate)

2. 随机搜索

import random

class RandomSearch:
    def __init__(self, param_distributions, n_iter=10):
        self.param_distributions = param_distributions
        self.n_iter = n_iter
        self.results = []
    
    def sample_params(self):
        """从分布中采样参数"""
        params = {}
        for param_name, distribution in self.param_distributions.items():
            if distribution["type"] == "uniform":
                params[param_name] = random.uniform(
                    distribution["low"], 
                    distribution["high"]
                )
            elif distribution["type"] == "log_uniform":
                params[param_name] = random.uniform(
                    distribution["low"], 
                    distribution["high"]
                )
                params[param_name] = 10 ** params[param_name]
            elif distribution["type"] == "choice":
                params[param_name] = random.choice(distribution["values"])
        return params
    
    def run(self, objective_function):
        """运行随机搜索"""
        for i in range(self.n_iter):
            params = self.sample_params()
            print(f"迭代 {i+1}/{self.n_iter}: {params}")
            
            result = objective_function(params)
            self.results.append({
                "params": params,
                "result": result
            })
        
        best_result = max(self.results, key=lambda x: x["result"])
        return best_result["params"], best_result["result"]

# 使用示例
param_distributions = {
    "learning_rate": {"type": "log_uniform", "low": -6, "high": -3},
    "batch_size": {"type": "choice", "values": [8, 16, 32, 64]},
    "dropout": {"type": "uniform", "low": 0.1, "high": 0.5}
}

random_search = RandomSearch(param_distributions, n_iter=20)
best_params, best_result = random_search.run(train_and_evaluate)

3. 贝叶斯优化

class BayesianOptimization:
    def __init__(self, param_bounds, n_initial=5, n_iterations=20):
        self.param_bounds = param_bounds
        self.n_initial = n_initial
        self.n_iterations = n_iterations
        self.results = []
        
    def surrogate_model(self, X):
        """高斯过程代理模型"""
        from sklearn.gaussian_process import GaussianProcessRegressor
        from sklearn.gaussian_process.kernels import RBF
        
        kernel = RBF(1.0)
        gp = GaussianProcessRegressor(kernel=kernel)
        
        if len(self.results) > 0:
            X_train = [list(r["params"].values()) for r in self.results]
            y_train = [r["result"] for r in self.results]
            gp.fit(X_train, y_train)
        
        return gp
    
    def acquisition_function(self, X):
        """获取函数(期望改进)"""
        gp = self.surrogate_model(X)
        if len(self.results) == 0:
            return 0
        
        # 简化的期望改进计算
        best_result = max(r["result"] for r in self.results)
        mean, std = gp.predict(X.reshape(1, -1), return_std=True)
        
        from scipy.stats import norm
        z = (mean - best_result) / std
        ei = std * (z * norm.cdf(z) + norm.pdf(z))
        return ei
    
    def suggest_next_params(self):
        """建议下一个参数组合"""
        if len(self.results) < self.n_initial:
            # 初始阶段:随机采样
            return self._random_sample()
        
        # 使用获取函数选择下一个点
        best_params = None
        best_ei = -float('inf')
        
        for _ in range(1000):  # 多次采样
            params = self._random_sample()
            X = list(params.values())
            ei = self.acquisition_function(X)
            
            if ei > best_ei:
                best_ei = ei
                best_params = params
        
        return best_params
    
    def _random_sample(self):
        """随机采样参数"""
        params = {}
        for param_name, bounds in self.param_bounds.items():
            if isinstance(bounds, list):
                params[param_name] = random.choice(bounds)
            else:
                params[param_name] = random.uniform(bounds[0], bounds[1])
        return params
    
    def run(self, objective_function):
        """运行贝叶斯优化"""
        # 初始采样
        for i in range(self.n_initial):
            params = self._random_sample()
            result = objective_function(params)
            self.results.append({"params": params, "result": result})
        
        # 贝叶斯优化
        for i in range(self.n_iterations - self.n_initial):
            params = self.suggest_next_params()
            result = objective_function(params)
            self.results.append({"params": params, "result": result})
            
            print(f"迭代 {i+1}: 参数={params}, 结果={result}")
        
        best_result = max(self.results, key=lambda x: x["result"])
        return best_result["params"], best_result["result"]

4. 遗传算法

class GeneticAlgorithm:
    def __init__(self, param_bounds, population_size=20, generations=10):
        self.param_bounds = param_bounds
        self.population_size = population_size
        self.generations = generations
        self.population = []
        self.fitness_scores = []
    
    def initialize_population(self):
        """初始化种群"""
        self.population = []
        for _ in range(self.population_size):
            individual = self._random_individual()
            self.population.append(individual)
    
    def _random_individual(self):
        """生成随机个体"""
        individual = {}
        for param_name, bounds in self.param_bounds.items():
            if isinstance(bounds, list):
                individual[param_name] = random.choice(bounds)
            else:
                individual[param_name] = random.uniform(bounds[0], bounds[1])
        return individual
    
    def evaluate_population(self, objective_function):
        """评估种群适应度"""
        self.fitness_scores = []
        for individual in self.population:
            fitness = objective_function(individual)
            self.fitness_scores.append(fitness)
    
    def select_parents(self):
        """选择父代"""
        # 锦标赛选择
        parents = []
        for _ in range(self.population_size):
            tournament = random.sample(range(self.population_size), 3)
            best_idx = max(tournament, key=lambda i: self.fitness_scores[i])
            parents.append(self.population[best_idx])
        return parents
    
    def crossover(self, parent1, parent2):
        """交叉操作"""
        child = {}
        for param_name in self.param_bounds.keys():
            if random.random() < 0.5:
                child[param_name] = parent1[param_name]
            else:
                child[param_name] = parent2[param_name]
        return child
    
    def mutate(self, individual, mutation_rate=0.1):
        """变异操作"""
        mutated = individual.copy()
        for param_name, bounds in self.param_bounds.items():
            if random.random() < mutation_rate:
                if isinstance(bounds, list):
                    mutated[param_name] = random.choice(bounds)
                else:
                    # 高斯变异
                    delta = random.gauss(0, 0.1)
                    mutated[param_name] = max(bounds[0], min(bounds[1], 
                                           individual[param_name] + delta))
        return mutated
    
    def evolve(self, objective_function):
        """执行进化过程"""
        self.initialize_population()
        
        for generation in range(self.generations):
            print(f"世代 {generation+1}/{self.generations}")
            
            # 评估适应度
            self.evaluate_population(objective_function)
            
            # 选择父代
            parents = self.select_parents()
            
            # 生成新一代
            new_population = []
            for i in range(0, self.population_size, 2):
                child1 = self.crossover(parents[i], parents[i+1])
                child2 = self.crossover(parents[i+1], parents[i])
                
                child1 = self.mutate(child1)
                child2 = self.mutate(child2)
                
                new_population.extend([child1, child2])
            
            self.population = new_population
        
        # 返回最佳个体
        best_idx = max(range(self.population_size), 
                      key=lambda i: self.fitness_scores[i])
        return self.population[best_idx], self.fitness_scores[best_idx]

实际应用案例

案例:LLM微调超参数优化

# 定义参数空间
param_space = {
    "learning_rate": (1e-6, 1e-3),
    "batch_size": [8, 16, 32, 64],
    "num_epochs": [2, 3, 5],
    "warmup_ratio": (0.0, 0.2),
    "weight_decay": (0.0, 0.1),
    "gradient_accumulation_steps": [1, 2, 4, 8]
}

# 目标函数
def objective_function(params):
    # 训练模型
    model = train_model_with_params(params)
    
    # 评估模型
    eval_results = evaluate_model(model)
    
    # 返回优化目标(例如:验证准确率)
    return eval_results["accuracy"]

# 使用贝叶斯优化
bo = BayesianOptimization(param_space, n_initial=10, n_iterations=50)
best_params, best_accuracy = bo.run(objective_function)

print(f"最佳参数: {best_params}")
print(f"最佳准确率: {best_accuracy}")

案例:提示优化

# 提示超参数
prompt_params = {
    "temperature": (0.1, 1.0),
    "top_p": (0.1, 1.0),
    "max_tokens": (100, 1000),
    "frequency_penalty": (0.0, 2.0),
    "presence_penalty": (0.0, 2.0)
}

# 目标函数
def prompt_objective(params):
    # 使用参数生成提示
    responses = generate_responses_with_params(test_prompts, params)
    
    # 评估响应质量
    quality_scores = evaluate_response_quality(responses)
    
    return sum(quality_scores) / len(quality_scores)

# 随机搜索
rs = RandomSearch(prompt_params, n_iter=30)
best_params, best_score = rs.run(prompt_objective)

最佳实践

1. 渐进式调优

# 阶段1:粗粒度搜索
coarse_grid = {
    "learning_rate": [1e-5, 1e-4, 1e-3],
    "batch_size": [8, 16, 32]
}

# 阶段2:细粒度搜索
fine_grid = {
    "learning_rate": [5e-5, 1e-4, 2e-4, 5e-4],
    "batch_size": [12, 16, 20, 24]
}

# 阶段3:局部优化
local_optimization = {
    "learning_rate": (1e-4, 3e-4),
    "batch_size": [14, 16, 18]
}

2. 并行化

import multiprocessing as mp

def parallel_search(param_combinations, objective_function):
    """并行超参数搜索"""
    with mp.Pool(processes=mp.cpu_count()) as pool:
        results = pool.map(objective_function, param_combinations)
    return results

# 使用异步执行
import asyncio

async def async_search(param_combinations, objective_function):
    """异步超参数搜索"""
    tasks = []
    for params in param_combinations:
        task = asyncio.create_task(
            asyncio.to_thread(objective_function, params)
        )
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    return results

3. 早停策略

class EarlyStopping:
    def __init__(self, patience=5, min_delta=0.001):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_score = None
    
    def __call__(self, score):
        if self.best_score is None:
            self.best_score = score
        elif score < self.best_score + self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                return True
        else:
            self.best_score = score
            self.counter = 0
        return False

总结

超参数调优是LLM开发的关键环节:

  1. 理解超参数 - 了解每个超参数的作用和影响
  2. 选择搜索策略 - 根据计算资源和时间选择合适的策略
  3. 渐进式优化 - 从粗粒度到细粒度逐步优化
  4. 并行化 - 利用并行计算加速搜索过程
  5. 记录和分析 - 详细记录每次实验的结果

通过系统化的超参数调优,可以显著提升LLM的性能和训练效率。