← 返回首页
🧠

网格搜索在LLM超参数调优中的应用

📂 llm ⏱ 8 min 1485 words

--- title: "网格搜索在LLM超参数调优中的应用" description: "介绍网格搜索算法在大型语言模型超参数调优中的原理、实现和最佳实践。" tags: ["网格搜索", "超参数调优", "llm", "机器学习", "优化算法"] category: "llm" icon: "🧠"

网格搜索在LLM超参数调优中的应用

什么是网格搜索?

网格搜索是一种简单的超参数优化方法,它通过穷举所有可能的参数组合来寻找最佳超参数。

网格搜索原理

1. 基本实现

class GridSearch:
    def __init__(self, param_grid):
        self.param_grid = param_grid
        self.results = []
    
    def generate_combinations(self):
        """生成所有参数组合"""
        import itertools
        
        keys = self.param_grid.keys()
        values = self.param_grid.values()
        
        combinations = []
        for combination in itertools.product(*values):
            combinations.append(dict(zip(keys, combination)))
        
        return combinations
    
    def run(self, objective_function):
        """运行网格搜索"""
        combinations = self.generate_combinations()
        
        for i, params in enumerate(combinations):
            print(f"组合 {i+1}/{len(combinations)}: {params}")
            
            # 运行目标函数
            result = objective_function(params)
            self.results.append({
                "params": params,
                "result": result
            })
        
        # 找到最佳参数
        best_result = max(self.results, key=lambda x: x["result"])
        return best_result["params"], best_result["result"]

2. 支持嵌套参数

class NestedGridSearch:
    def __init__(self, param_grid):
        self.param_grid = param_grid
        self.results = []
    
    def _flatten_grid(self, grid, prefix=""):
        """展平嵌套参数网格"""
        items = {}
        for key, value in grid.items():
            new_key = f"{prefix}{key}" if prefix else key
            if isinstance(value, dict):
                items.update(self._flatten_grid(value, f"{new_key}."))
            else:
                items[new_key] = value
        return items
    
    def generate_combinations(self):
        """生成所有参数组合"""
        import itertools
        
        flat_grid = self._flatten_grid(self.param_grid)
        keys = flat_grid.keys()
        values = flat_grid.values()
        
        combinations = []
        for combination in itertools.product(*values):
            combination_dict = dict(zip(keys, combination))
            # 恢复嵌套结构
            nested_dict = self._restore_nested(combination_dict)
            combinations.append(nested_dict)
        
        return combinations
    
    def _restore_nested(self, flat_dict):
        """恢复嵌套字典结构"""
        nested = {}
        for key, value in flat_dict.items():
            parts = key.split('.')
            current = nested
            for part in parts[:-1]:
                if part not in current:
                    current[part] = {}
                current = current[part]
            current[parts[-1]] = value
        return nested

LLM超参数网格搜索实践

1. 学习率和批量大小搜索

# 定义参数网格
param_grid = {
    "learning_rate": [1e-5, 2e-5, 5e-5, 1e-4],
    "batch_size": [8, 16, 32, 64],
    "dropout": [0.1, 0.2, 0.3]
}

# 目标函数
def objective(params):
    # 训练模型
    model = train_model(
        learning_rate=params["learning_rate"],
        batch_size=params["batch_size"],
        dropout=params["dropout"]
    )
    
    # 评估模型
    accuracy = evaluate_model(model)
    
    return accuracy

# 执行网格搜索
grid_search = GridSearch(param_grid)
best_params, best_accuracy = grid_search.run(objective)

print(f"最佳参数: {best_params}")
print(f"最佳准确率: {best_accuracy}")

2. 模型架构搜索

# 模型架构参数网格
arch_param_grid = {
    "hidden_size": [256, 512, 1024, 2048],
    "num_layers": [2, 4, 6, 8],
    "num_heads": [2, 4, 8],
    "ffn_size": [1024, 2048, 4096]
}

# 目标函数
def architecture_objective(params):
    # 创建模型
    model = create_model(
        hidden_size=params["hidden_size"],
        num_layers=params["num_layers"],
        num_heads=params["num_heads"],
        ffn_size=params["ffn_size"]
    )
    
    # 训练和评估
    train_model(model)
    accuracy = evaluate_model(model)
    
    # 考虑模型大小
    model_size = sum(p.numel() for p in model.parameters())
    
    # 返回综合分数(准确率和效率的权衡)
    return accuracy * (1 - model_size / 1e9)  # 简化的效率考虑

# 执行搜索
arch_grid_search = GridSearch(arch_param_grid)
best_arch_params, best_arch_score = arch_grid_search.run(architectural_objective)

3. 提示优化网格搜索

# 提示参数网格
prompt_param_grid = {
    "temperature": [0.1, 0.3, 0.5, 0.7, 0.9],
    "top_p": [0.1, 0.3, 0.5, 0.7, 0.9],
    "max_tokens": [100, 200, 500, 1000],
    "frequency_penalty": [0.0, 0.5, 1.0, 1.5],
    "presence_penalty": [0.0, 0.5, 1.0, 1.5]
}

# 目标函数
def prompt_objective(params):
    # 生成响应
    responses = generate_responses(
        test_prompts,
        temperature=params["temperature"],
        top_p=params["top_p"],
        max_tokens=params["max_tokens"],
        frequency_penalty=params["frequency_penalty"],
        presence_penalty=params["presence_penalty"]
    )
    
    # 评估质量
    quality_score = evaluate_response_quality(responses)
    
    return quality_score

# 执行搜索
prompt_grid_search = GridSearch(prompt_param_grid)
best_prompt_params, best_prompt_score = prompt_grid_search.run(prompt_objective)

高级网格搜索技术

1. 随机网格搜索

import random

class RandomGridSearch:
    def __init__(self, param_grid, n_iter=50):
        self.param_grid = param_grid
        self.n_iter = n_iter
        self.results = []
    
    def sample_params(self):
        """从网格中随机采样参数"""
        params = {}
        for param_name, values in self.param_grid.items():
            params[param_name] = random.choice(values)
        return params
    
    def run(self, objective_function):
        """运行随机网格搜索"""
        for i in range(self.n_iter):
            params = self.sample_params()
            print(f"迭代 {i+1}/{self.n_iter}: {params}")
            
            result = objective_function(params)
            self.results.append({
                "params": params,
                "result": result
            })
        
        best_result = max(self.results, key=lambda x: x["result"])
        return best_result["params"], best_result["result"]

2. 分层网格搜索

class HierarchicalGridSearch:
    def __init__(self, param_grids):
        self.param_grids = param_grids
        self.results = []
    
    def run_coarse_search(self, objective_function, param_grid):
        """粗粒度搜索"""
        print("开始粗粒度搜索...")
        grid_search = GridSearch(param_grid)
        best_params, best_result = grid_search.run(objective_function)
        return best_params, best_result
    
    def run_fine_search(self, objective_function, base_params, param_grid):
        """细粒度搜索"""
        print("开始细粒度搜索...")
        
        # 创建细化参数网格
        fine_grid = {}
        for param_name, values in param_grid.items():
            if param_name in base_params:
                # 在最佳值附近细化
                best_value = base_params[param_name]
                if isinstance(values[0], (int, float)):
                    # 数值参数:在最佳值附近采样
                    if isinstance(values[0], int):
                        fine_grid[param_name] = [
                            max(1, best_value - 2),
                            best_value - 1,
                            best_value,
                            best_value + 1,
                            best_value + 2
                        ]
                    else:
                        fine_grid[param_name] = [
                            best_value * 0.5,
                            best_value * 0.8,
                            best_value,
                            best_value * 1.2,
                            best_value * 1.5
                        ]
                else:
                    # 类别参数:保持原网格
                    fine_grid[param_name] = values
            else:
                fine_grid[param_name] = values
        
        # 执行细粒度搜索
        grid_search = GridSearch(fine_grid)
        best_fine_params, best_fine_result = grid_search.run(objective_function)
        
        return best_fine_params, best_fine_result
    
    def run(self, objective_function):
        """执行分层网格搜索"""
        current_params = {}
        
        # 阶段1:粗粒度搜索
        if len(self.param_grids) > 0:
            coarse_params, coarse_result = self.run_coarse_search(
                objective_function, self.param_grids[0]
            )
            current_params.update(coarse_params)
        
        # 阶段2-N:细粒度搜索
        for i, param_grid in enumerate(self.param_grids[1:], 1):
            fine_params, fine_result = self.run_fine_search(
                objective_function, current_params, param_grid
            )
            current_params.update(fine_params)
        
        return current_params, fine_result

3. 并行网格搜索

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor

class ParallelGridSearch:
    def __init__(self, param_grid, n_workers=None):
        self.param_grid = param_grid
        self.n_workers = n_workers or mp.cpu_count()
        self.results = []
    
    def _evaluate_params(self, params, objective_function):
        """评估单个参数组合"""
        result = objective_function(params)
        return {"params": params, "result": result}
    
    def run(self, objective_function):
        """运行并行网格搜索"""
        combinations = self._generate_combinations()
        
        # 使用进程池并行评估
        with ProcessPoolExecutor(max_workers=self.n_workers) as executor:
            futures = []
            for params in combinations:
                future = executor.submit(
                    self._evaluate_params, params, objective_function
                )
                futures.append(future)
            
            # 收集结果
            for future in futures:
                result = future.result()
                self.results.append(result)
        
        # 找到最佳参数
        best_result = max(self.results, key=lambda x: x["result"])
        return best_result["params"], best_result["result"]
    
    def _generate_combinations(self):
        """生成所有参数组合"""
        import itertools
        
        keys = self.param_grid.keys()
        values = self.param_grid.values()
        
        combinations = []
        for combination in itertools.product(*values):
            combinations.append(dict(zip(keys, combination)))
        
        return combinations

实际应用案例

案例:LLM微调超参数优化

# 分层参数网格
param_grids = [
    # 第一层:粗粒度搜索
    {
        "learning_rate": [1e-5, 1e-4, 1e-3],
        "batch_size": [8, 16, 32],
        "epochs": [1, 3, 5]
    },
    # 第二层:细粒度搜索
    {
        "learning_rate": [5e-5, 1e-4, 2e-4, 5e-4],
        "batch_size": [12, 16, 20, 24],
        "epochs": [2, 3, 4],
        "dropout": [0.1, 0.2, 0.3]
    },
    # 第三层:局部优化
    {
        "learning_rate": [1e-4, 1.5e-4, 2e-4],
        "batch_size": [14, 16, 18],
        "weight_decay": [0.0, 0.01, 0.1]
    }
]

# 目标函数
def llm_finetuning_objective(params):
    # 训练模型
    model = train_llm(
        learning_rate=params["learning_rate"],
        batch_size=params["batch_size"],
        epochs=params["epochs"],
        dropout=params.get("dropout", 0.1),
        weight_decay=params.get("weight_decay", 0.01)
    )
    
    # 评估模型
    accuracy = evaluate_llm(model)
    
    # 考虑训练时间
    training_time = estimate_training_time(params)
    
    # 返回综合分数
    return accuracy * (1 - training_time / 3600)  # 简化的时间惩罚

# 执行分层网格搜索
hierarchical_search = HierarchicalGridSearch(param_grids)
best_params, best_score = hierarchical_search.run(llm_finetuning_objective)

print(f"最佳参数: {best_params}")
print(f"最佳分数: {best_score}")

案例:多目标网格搜索

# 多目标优化
class MultiObjectiveGridSearch:
    def __init__(self, param_grid, objectives):
        self.param_grid = param_grid
        self.objectives = objectives  # 例如: ["accuracy", "latency", "memory"]
        self.results = []
    
    def evaluate_multiple_objectives(self, params):
        """评估多个目标"""
        results = {}
        for objective_name in self.objectives:
            if objective_name == "accuracy":
                results[objective_name] = self._evaluate_accuracy(params)
            elif objective_name == "latency":
                results[objective_name] = self._evaluate_latency(params)
            elif objective_name == "memory":
                results[objective_name] = self._evaluate_memory(params)
        return results
    
    def _evaluate_accuracy(self, params):
        model = train_model(params)
        return evaluate_model_accuracy(model)
    
    def _evaluate_latency(self, params):
        model = train_model(params)
        return measure_latency(model)
    
    def _evaluate_memory(self, params):
        model = train_model(params)
        return measure_memory_usage(model)
    
    def run(self):
        """运行多目标网格搜索"""
        combinations = self._generate_combinations()
        
        for params in combinations:
            multi_results = self.evaluate_multiple_objectives(params)
            self.results.append({
                "params": params,
                "results": multi_results
            })
        
        # 找到帕累托前沿
        pareto_front = self._find_pareto_front()
        
        return pareto_front
    
    def _find_pareto_front(self):
        """找到帕累托前沿"""
        pareto = []
        for i, result_i in enumerate(self.results):
            dominated = False
            for j, result_j in enumerate(self.results):
                if i != j and self._dominates(result_j, result_i):
                    dominated = True
                    break
            if not dominated:
                pareto.append(result_i)
        return pareto
    
    def _dominates(self, result_a, result_b):
        """检查结果A是否支配结果B"""
        # 所有目标都更好或相等,且至少一个目标严格更好
        better_or_equal = True
        strictly_better = False
        
        for objective in self.objectives:
            if objective == "accuracy":  # 越大越好
                if result_a["results"][objective] < result_b["results"][objective]:
                    better_or_equal = False
                    break
                elif result_a["results"][objective] > result_b["results"][objective]:
                    strictly_better = True
            else:  # 越小越好(延迟、内存)
                if result_a["results"][objective] > result_b["results"][objective]:
                    better_or_equal = False
                    break
                elif result_a["results"][objective] < result_b["results"][objective]:
                    strictly_better = True
        
        return better_or_equal and strictly_better

# 使用多目标网格搜索
param_grid = {
    "hidden_size": [256, 512, 1024],
    "num_layers": [2, 4, 6],
    "batch_size": [8, 16, 32]
}

objectives = ["accuracy", "latency", "memory"]
multi_obj_search = MultiObjectiveGridSearch(param_grid, objectives)
pareto_front = multi_obj_search.run()

print("帕累托前沿解:")
for solution in pareto_front:
    print(f"参数: {solution['params']}")
    print(f"目标值: {solution['results']}")

最佳实践

1. 参数网格设计

# 合理的参数网格设计
def design_parameter_grid(model_type):
    if model_type == "transformer":
        return {
            "hidden_size": [256, 512, 1024, 2048],
            "num_layers": [2, 4, 6, 8],
            "num_heads": [2, 4, 8],
            "learning_rate": [1e-5, 2e-5, 5e-5, 1e-4],
            "batch_size": [8, 16, 32],
            "dropout": [0.1, 0.2, 0.3]
        }
    elif model_type == "rnn":
        return {
            "hidden_size": [128, 256, 512],
            "num_layers": [1, 2, 3],
            "learning_rate": [1e-4, 5e-4, 1e-3],
            "batch_size": [16, 32, 64],
            "dropout": [0.2, 0.3, 0.4]
        }

2. 资源管理

# 资源感知的网格搜索
class ResourceAwareGridSearch:
    def __init__(self, param_grid, max_time=3600, max_memory=8e9):
        self.param_grid = param_grid
        self.max_time = max_time
        self.max_memory = max_memory
        self.results = []
    
    def estimate_resource_usage(self, params):
        """估计资源使用"""
        # 估计训练时间
        estimated_time = self._estimate_time(params)
        
        # 估计内存使用
        estimated_memory = self._estimate_memory(params)
        
        return {
            "time": estimated_time,
            "memory": estimated_memory
        }
    
    def _estimate_time(self, params):
        """估计训练时间"""
        # 基于历史数据或简单启发式
        base_time = 100  # 基础时间(秒)
        
        # 根据参数调整
        if "batch_size" in params:
            base_time *= (16 / params["batch_size"])  # 批量大小影响
        if "hidden_size" in params:
            base_time *= (params["hidden_size"] / 512) ** 2  # 隐藏层大小影响
        
        return base_time
    
    def _estimate_memory(self, params):
        """估计内存使用"""
        # 简化的内存估计
        base_memory = 1e9  # 1GB基础内存
        
        if "batch_size" in params:
            base_memory *= params["batch_size"] / 16
        if "hidden_size" in params:
            base_memory *= params["hidden_size"] / 512
        
        return base_memory
    
    def run(self, objective_function):
        """运行资源感知的网格搜索"""
        combinations = self._generate_combinations()
        
        total_time = 0
        total_memory = 0
        
        for params in combinations:
            # 检查资源限制
            resources = self.estimate_resource_usage(params)
            
            if total_time + resources["time"] > self.max_time:
                print(f"跳过参数 {params}: 时间限制")
                continue
            
            if total_memory + resources["memory"] > self.max_memory:
                print(f"跳过参数 {params}: 内存限制")
                continue
            
            # 评估参数
            result = objective_function(params)
            self.results.append({
                "params": params,
                "result": result,
                "resources": resources
            })
            
            # 更新资源使用
            total_time += resources["time"]
            total_memory += resources["memory"]
        
        # 找到最佳参数
        if self.results:
            best_result = max(self.results, key=lambda x: x["result"])
            return best_result["params"], best_result["result"]
        else:
            return None, None

3. 结果分析

# 网格搜索结果分析
class GridSearchAnalyzer:
    def __init__(self, results):
        self.results = results
    
    def get_best_params(self):
        """获取最佳参数"""
        best_result = max(self.results, key=lambda x: x["result"])
        return best_result["params"], best_result["result"]
    
    def get_parameter_importance(self):
        """分析参数重要性"""
        from collections import defaultdict
        import numpy as np
        
        param_scores = defaultdict(list)
        
        for result in self.results:
            params = result["params"]
            score = result["result"]
            
            for param_name, param_value in params.items():
                param_scores[param_name].append({
                    "value": param_value,
                    "score": score
                })
        
        # 计算每个参数的重要性
        importance = {}
        for param_name, scores in param_scores.items():
            values = [s["value"] for s in scores]
            scores_list = [s["score"] for s in scores]
            
            # 计算方差解释比例
            if len(set(values)) > 1:
                # 简化的方差分析
                unique_values = list(set(values))
                group_means = []
                for value in unique_values:
                    group_scores = [s["score"] for s in scores if s["value"] == value]
                    group_means.append(np.mean(group_scores))
                
                # 计算组间方差
                overall_mean = np.mean(scores_list)
                between_variance = np.mean([(m - overall_mean) ** 2 for m in group_means])
                
                importance[param_name] = between_variance
            else:
                importance[param_name] = 0
        
        return importance
    
    def visualize_results(self):
        """可视化结果"""
        import matplotlib.pyplot as plt
        
        # 绘制参数-性能关系
        param_names = list(self.results[0]["params"].keys())
        
        fig, axes = plt.subplots(len(param_names), 1, figsize=(10, 4*len(param_names)))
        
        for i, param_name in enumerate(param_names):
            ax = axes[i] if len(param_names) > 1 else axes
            
            # 收集参数值和对应的性能
            values = []
            scores = []
            for result in self.results:
                values.append(result["params"][param_name])
                scores.append(result["result"])
            
            # 绘制散点图
            ax.scatter(values, scores, alpha=0.6)
            ax.set_xlabel(param_name)
            ax.set_ylabel("Performance")
            ax.set_title(f"{param_name} vs Performance")
        
        plt.tight_layout()
        plt.savefig("grid_search_results.png")
        plt.show()

# 使用分析器
analyzer = GridSearchAnalyzer(grid_search.results)
best_params, best_score = analyzer.get_best_params()
importance = analyzer.get_parameter_importance()
analyzer.visualize_results()

总结

网格搜索是LLM超参数调优的基础方法:

  1. 简单易用 - 实现简单,易于理解
  2. 全面覆盖 - 穷举所有参数组合
  3. 可重复 - 结果完全可重复
  4. 并行化 - 容易并行化加速
  5. 可扩展 - 可以结合其他技术增强

虽然网格搜索在计算资源有限时可能不够高效,但通过合理设计参数网格、使用随机采样、分层搜索等技术,可以显著提高搜索效率。在LLM开发中,网格搜索常用于初始超参数探索和基准测试。