← 返回首页
🧠

流水线并行:层间并行计算

📂 llm ⏱ 3 min 502 words

--- title: "流水线并行:层间并行计算" description: "掌握流水线并行的原理和实现,实现多GPU间的层间并行" tags: ["流水线并行", "层间并行", "GPipe", "Microbatch"] category: "llm" icon: "🧠"

流水线并行:层间并行计算

流水线并行简介

流水线并行(Pipeline Parallelism)是将模型的不同层分配到不同GPU上,通过流水线方式执行前向和反向传播的技术。与张量并行不同,流水线并行在层间进行分割,通信主要发生在层边界。

流水线并行的核心优势:

工作原理

基本流水线

import torch
import torch.nn as nn
import torch.distributed as dist

class PipelineStage(nn.Module):
    """流水线阶段"""
    
    def __init__(self, layers, stage_id):
        super().__init__()
        self.layers = nn.ModuleList(layers)
        self.stage_id = stage_id
    
    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

class PipelineModel(nn.Module):
    """流水线模型"""
    
    def __init__(self, all_layers, num_stages):
        super().__init__()
        
        # 将层分配到不同阶段
        layers_per_stage = len(all_layers) // num_stages
        self.stages = nn.ModuleList([
            PipelineStage(
                all_layers[i*layers_per_stage:(i+1)*layers_per_stage],
                stage_id=i
            )
            for i in range(num_stages)
        ])
    
    def forward(self, x):
        for stage in self.stages:
            x = stage(x)
        return x

GPipe调度

class GPipeScheduler:
    """GPipe流水线调度器"""
    
    def __init__(self, model, num_microbatches):
        self.model = model
        self.num_microbatches = num_microbatches
        self.num_stages = len(model.stages)
    
    def forward_backward(self, input_batch, loss_fn):
        """执行前向和反向传播"""
        # 将batch分割为microbatches
        microbatches = torch.chunk(input_batch, self.num_microbatches)
        
        # 前向传播
        activations = []
        losses = []
        
        for mb_idx, mb in enumerate(microbatches):
            # 顺序执行各阶段
            x = mb
            mb_activations = []
            
            for stage_idx, stage in enumerate(self.model.stages):
                x = stage(x)
                mb_activations.append(x)
            
            activations.append(mb_activations)
            
            # 计算损失
            loss = loss_fn(x)
            losses.append(loss)
        
        # 反向传播(逆序)
        for mb_idx in range(self.num_microbatches - 1, -1, -1):
            loss = losses[mb_idx]
            loss.backward()
        
        return sum(losses) / len(losses)

1F1B调度

class OneFOneBScheduler:
    """1F1B(One Forward One Backward)调度器"""
    
    def __init__(self, model, num_microbatches):
        self.model = model
        self.num_microbatches = num_microbatches
        self.num_stages = len(model.stages)
    
    def forward_backward(self, input_batch, loss_fn):
        """1F1B调度"""
        microbatches = torch.chunk(input_batch, self.num_microbatches)
        
        # 预热阶段:只前向
        for i in range(self.num_stages - 1):
            self._forward_step(microbatches[i])
        
        # 稳定阶段:一前一后
        for i in range(self.num_microbatches - self.num_stages + 1):
            self._forward_step(microbatches[i + self.num_stages - 1])
            self._backward_step()
        
        # 收尾阶段:只反向
        for i in range(self.num_stages - 1):
            self._backward_step()

使用Megatron-LM

from megatron.core import parallel_state
from megatron.core.pipeline_parallel import get_forward_backward_func

# 初始化流水线并行
parallel_state.initialize_model_parallel(
    tensor_model_parallel_size=1,
    pipeline_model_parallel_size=4,  # 4个流水线阶段
    virtual_pipeline_model_parallel_size=None
)

# 获取调度函数
forward_backward_func = get_forward_backward_func(
    forward_step=forward_step,
    backward_step=backward_step,
    num_microbatches=8,
    pipeline_model_parallel_size=4
)

# 执行训练
loss = forward_backward_func(model, input_batch, loss_fn)

通信优化

通信模式

# 流水线并行通信
communication_patterns = {
    "P2P": "点对点通信(激活值传输)",
    "AllReduce": "梯度归约(数据并行)",
    "AllGather": "收集输出(张量并行)"
}

# 通信量计算
def calculate_pipeline_communication(activation_size, num_stages):
    """计算流水线通信量"""
    # 每个阶段间传输激活值
    communication_volume = activation_size * (num_stages - 1)
    
    return communication_volume

气泡优化

def calculate_bubble_ratio(num_microbatches, num_stages):
    """计算气泡比例"""
    # 气泡:流水线中的空闲时间
    bubble_ratio = (num_stages - 1) / (num_microbatches + num_stages - 1)
    return bubble_ratio

# 减少气泡
def reduce_bubble(num_microbatches, num_stages):
    """减少气泡的策略"""
    strategies = {
        "增加microbatch": "num_microbatches >> num_stages",
        "虚拟流水线": "将阶段进一步细分",
        "交错调度": "1F1B调度减少气泡"
    }
    return strategies

内存优化

# 激活值检查点
class CheckpointedPipelineStage(nn.Module):
    """带检查点的流水线阶段"""
    
    def __init__(self, layers):
        super().__init__()
        self.layers = nn.ModuleList(layers)
    
    def forward(self, x):
        # 使用检查点减少激活内存
        for layer in self.layers:
            x = torch.utils.checkpoint.checkpoint(
                layer, x, use_reentrant=False
            )
        return x

# 内存估算
def estimate_pipeline_memory(model_size, num_stages, num_microbatches):
    """估算流水线并行内存"""
    per_stage_memory = model_size / num_stages
    
    # 激活内存
    activation_memory = per_stage_memory * num_microbatches * 0.1
    
    total_memory = per_stage_memory + activation_memory
    
    return {
        "模型内存": per_stage_memory,
        "激活内存": activation_memory,
        "总内存": total_memory
    }

性能优化

# 流水线并行性能
performance = {
    "GPipe": {
        "优点": "简单易实现",
        "缺点": "气泡大",
        "适用": "小规模训练"
    },
    "1F1B": {
        "优点": "气泡小",
        "缺点": "实现复杂",
        "适用": "大规模训练"
    },
    "Interleaved": {
        "优点": "气泡最小",
        "缺点": "通信复杂",
        "适用": "超大规模训练"
    }
}

与其他并行结合

# 3D并行:张量 + 流水线 + 数据
def setup_3d_parallel():
    """设置3D并行"""
    from megatron.core import parallel_state
    
    tp_size = 2  # 张量并行
    pp_size = 4  # 流水线并行
    dp_size = world_size // (tp_size * pp_size)  # 数据并行
    
    parallel_state.initialize_model_parallel(
        tensor_model_parallel_size=tp_size,
        pipeline_model_parallel_size=pp_size,
        virtual_pipeline_model_parallel_size=None
    )
    
    return tp_size, pp_size, dp_size

性能基准

# 流水线并行性能
benchmark = {
    "4 GPU": {
        "加速比": "3.2x",
        "气泡比例": "15%",
        "通信开销": "5%"
    },
    "8 GPU": {
        "加速比": "6.1x",
        "气泡比例": "12%",
        "通信开销": "8%"
    },
    "16 GPU": {
        "加速比": "11.5x",
        "气泡比例": "10%",
        "通信开销": "12%"
    }
}

最佳实践

  1. microbatch数量:设置为流水线阶段数的2-4倍
  2. 调度策略:优先使用1F1B调度
  3. 通信优化:使用点对点通信
  4. 内存管理:启用激活检查点
  5. 负载均衡:确保各阶段计算量均衡

流水线并行是训练深层大模型的有效技术,与张量并行和数据并行结合可以实现超大规模训练。