← 返回首页
🧠

分布式训练:并行化策略详解

📂 llm ⏱ 3 min 432 words

--- title: "分布式训练:并行化策略详解" description: "掌握分布式训练的核心概念、并行策略和实现方法" tags: ["分布式训练", "并行计算", "数据并行", "模型并行"] category: "llm" icon: "🧠"

分布式训练:并行化策略详解

分布式训练概述

分布式训练是将训练任务分布到多个计算设备上的技术。对于大语言模型,分布式训练是必不可少的,因为单个GPU无法容纳完整的模型和数据。

分布式训练的核心目标:

并行策略

数据并行(Data Parallelism)

import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP

def setup_data_parallel(rank, world_size):
    """设置数据并行"""
    dist.init_process_group(
        backend="nccl",
        rank=rank,
        world_size=world_size
    )
    torch.cuda.set_device(rank)
    
    # 创建模型
    model = create_model()
    model = model.to(rank)
    
    # DDP包装
    model = DDP(model, device_ids=[rank])
    
    return model

def train_epoch(model, dataloader, optimizer, rank):
    """数据并行训练"""
    model.train()
    
    for batch in dataloader:
        # 每个GPU处理不同的数据
        batch = {k: v.to(rank) for k, v in batch.items()}
        
        outputs = model(**batch)
        loss = outputs.loss
        
        loss.backward()  # 自动同步梯度
        
        optimizer.step()
        optimizer.zero_grad()

张量并行(Tensor Parallelism)

# 张量并行:将单个层的权重矩阵分割到多个GPU
from megatron.core.tensor_parallel import ColumnParallelLinear, RowParallelLinear

class TensorParallelMLP(nn.Module):
    def __init__(self, hidden_size, intermediate_size):
        super().__init__()
        
        # 列并行:分割输出维度
        self.dense_h_to_4h = ColumnParallelLinear(
            hidden_size,
            intermediate_size,
            gather_output=False  # 不收集输出
        )
        
        # 行并行:分割输入维度
        self.dense_4h_to_h = RowParallelLinear(
            intermediate_size,
            hidden_size,
            input_is_parallel=True  # 输入已并行
        )
    
    def forward(self, x):
        # 自动处理通信(AllReduce)
        x = self.dense_h_to_4h(x)
        x = F.silu(x)
        x = self.dense_4h_to_h(x)
        return x

流水线并行(Pipeline Parallelism)

# 流水线并行:将模型层分割到多个GPU
class PipelineParallelModel(nn.Module):
    def __init__(self, layers_per_stage):
        super().__init__()
        
        # 将层分配到不同GPU
        self.stages = nn.ModuleList([
            nn.Sequential(*create_layers(i))
            for i in range(num_stages)
        ])
    
    def forward(self, x):
        for stage in self.stages:
            x = stage(x)
            # 需要跨GPU传输
        return x

# GPipe调度
def gpipe_schedule(model, microbatches):
    """GPipe流水线调度"""
    num_microbatches = len(microbatches)
    num_stages = len(model.stages)
    
    # 前向传播
    for i in range(num_microbatches):
        for j in range(num_stages):
            # 处理微批次
            pass
    
    # 反向传播
    for i in range(num_microbatches - 1, -1, -1):
        for j in range(num_stages - 1, -1, -1):
            pass

序列并行(Sequence Parallelism)

# 序列并行:将序列维度分割到多个GPU
def sequence_parallel_forward(x, local_rank):
    """序列并行前向传播"""
    batch_size, seq_len, hidden_size = x.shape
    
    # 将序列分割到多个GPU
    chunks = torch.chunk(x, world_size, dim=1)
    local_x = chunks[local_rank]
    
    # 本地计算
    output = local_model(local_x)
    
    # 收集结果
    outputs = [torch.zeros_like(output) for _ in range(world_size)]
    dist.all_gather(outputs, output)
    
    return torch.cat(outputs, dim=1)

混合并行

# 3D并行:数据并行 + 张量并行 + 流水线并行
def setup_hybrid_parallel():
    """设置混合并行"""
    from megatron.core import parallel_state
    
    # 初始化并行组
    parallel_state.initialize_model_parallel(
        tensor_model_parallel_size=2,  # 张量并行
        pipeline_model_parallel_size=4,  # 流水线并行
        virtual_pipeline_model_parallel_size=None
    )
    
    # 获取各并行组
    tp_group = parallel_state.get_tensor_model_parallel_group()
    pp_group = parallel_state.get_pipeline_model_parallel_group()
    dp_group = parallel_state.get_data_parallel_group()
    
    return tp_group, pp_group, dp_group

内存优化

ZeRO优化

# DeepSpeed ZeRO配置
zero_config = {
    "stage": 3,
    "offload_optimizer": {"device": "cpu"},
    "offload_param": {"device": "cpu"},
    "overlap_comm": True,
    "contiguous_gradients": True,
    "reduce_bucket_size": 5e8,
    "stage3_prefetch_bucket_size": 5e8,
    "stage3_param_persistence_threshold": 1e6
}

# 内存节省
memory_savings = {
    "ZeRO-1": "4x (优化器状态分片)",
    "ZeRO-2": "8x (优化器+梯度分片)",
    "ZeRO-3": "线性 (全分片)"
}

梯度检查点

# 激活检查点(Activation Checkpointing)
from torch.utils.checkpoint import checkpoint

class CheckpointedModel(nn.Module):
    def __init__(self, layers):
        super().__init__()
        self.layers = layers
    
    def forward(self, x):
        for layer in self.layers:
            # 使用检查点减少激活内存
            x = checkpoint(layer, x, use_reentrant=False)
        return x

性能监控

import time
import torch.distributed as dist

class DistributedProfiler:
    """分布式性能分析器"""
    
    def __init__(self):
        self.timers = {}
    
    def start(self, name):
        if name not in self.timers:
            self.timers[name] = []
        self.timers[name].append(time.time())
    
    def stop(self, name):
        if self.timers[name]:
            start_time = self.timers[name].pop()
            elapsed = time.time() - start_time
            return elapsed
        return 0
    
    def all_reduce_stats(self):
        """汇总所有GPU的统计信息"""
        stats = {}
        for name, times in self.timers.items():
            avg_time = sum(times) / len(times)
            stats[name] = avg_time
        return stats

最佳实践

  1. 选择策略:小模型用数据并行,大模型用混合并行
  2. 通信优化:启用overlap通信
  3. 内存管理:使用ZeRO和激活检查点
  4. 批量大小:平衡吞吐量和内存
  5. 监控性能:使用NVIDIA NSight分析瓶颈

分布式训练是大模型训练的基石,掌握各种并行策略是LLM开发的必备技能。