流水线并行:层间并行计算
--- title: "流水线并行:层间并行计算" description: "掌握流水线并行的原理和实现,实现多GPU间的层间并行" tags: ["流水线并行", "层间并行", "GPipe", "Microbatch"] category: "llm" icon: "🧠"
流水线并行:层间并行计算
流水线并行简介
流水线并行(Pipeline Parallelism)是将模型的不同层分配到不同GPU上,通过流水线方式执行前向和反向传播的技术。与张量并行不同,流水线并行在层间进行分割,通信主要发生在层边界。
流水线并行的核心优势:
- 内存优化:每个GPU只存储部分层
- 通信高效:只需传输激活值
- 易于实现:相对简单的分割方式
- 适合深层模型:特别适合层数很多的模型
工作原理
基本流水线
import torch
import torch.nn as nn
import torch.distributed as dist
class PipelineStage(nn.Module):
"""流水线阶段"""
def __init__(self, layers, stage_id):
super().__init__()
self.layers = nn.ModuleList(layers)
self.stage_id = stage_id
def forward(self, x):
for layer in self.layers:
x = layer(x)
return x
class PipelineModel(nn.Module):
"""流水线模型"""
def __init__(self, all_layers, num_stages):
super().__init__()
# 将层分配到不同阶段
layers_per_stage = len(all_layers) // num_stages
self.stages = nn.ModuleList([
PipelineStage(
all_layers[i*layers_per_stage:(i+1)*layers_per_stage],
stage_id=i
)
for i in range(num_stages)
])
def forward(self, x):
for stage in self.stages:
x = stage(x)
return x
GPipe调度
class GPipeScheduler:
"""GPipe流水线调度器"""
def __init__(self, model, num_microbatches):
self.model = model
self.num_microbatches = num_microbatches
self.num_stages = len(model.stages)
def forward_backward(self, input_batch, loss_fn):
"""执行前向和反向传播"""
# 将batch分割为microbatches
microbatches = torch.chunk(input_batch, self.num_microbatches)
# 前向传播
activations = []
losses = []
for mb_idx, mb in enumerate(microbatches):
# 顺序执行各阶段
x = mb
mb_activations = []
for stage_idx, stage in enumerate(self.model.stages):
x = stage(x)
mb_activations.append(x)
activations.append(mb_activations)
# 计算损失
loss = loss_fn(x)
losses.append(loss)
# 反向传播(逆序)
for mb_idx in range(self.num_microbatches - 1, -1, -1):
loss = losses[mb_idx]
loss.backward()
return sum(losses) / len(losses)
1F1B调度
class OneFOneBScheduler:
"""1F1B(One Forward One Backward)调度器"""
def __init__(self, model, num_microbatches):
self.model = model
self.num_microbatches = num_microbatches
self.num_stages = len(model.stages)
def forward_backward(self, input_batch, loss_fn):
"""1F1B调度"""
microbatches = torch.chunk(input_batch, self.num_microbatches)
# 预热阶段:只前向
for i in range(self.num_stages - 1):
self._forward_step(microbatches[i])
# 稳定阶段:一前一后
for i in range(self.num_microbatches - self.num_stages + 1):
self._forward_step(microbatches[i + self.num_stages - 1])
self._backward_step()
# 收尾阶段:只反向
for i in range(self.num_stages - 1):
self._backward_step()
使用Megatron-LM
from megatron.core import parallel_state
from megatron.core.pipeline_parallel import get_forward_backward_func
# 初始化流水线并行
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=1,
pipeline_model_parallel_size=4, # 4个流水线阶段
virtual_pipeline_model_parallel_size=None
)
# 获取调度函数
forward_backward_func = get_forward_backward_func(
forward_step=forward_step,
backward_step=backward_step,
num_microbatches=8,
pipeline_model_parallel_size=4
)
# 执行训练
loss = forward_backward_func(model, input_batch, loss_fn)
通信优化
通信模式
# 流水线并行通信
communication_patterns = {
"P2P": "点对点通信(激活值传输)",
"AllReduce": "梯度归约(数据并行)",
"AllGather": "收集输出(张量并行)"
}
# 通信量计算
def calculate_pipeline_communication(activation_size, num_stages):
"""计算流水线通信量"""
# 每个阶段间传输激活值
communication_volume = activation_size * (num_stages - 1)
return communication_volume
气泡优化
def calculate_bubble_ratio(num_microbatches, num_stages):
"""计算气泡比例"""
# 气泡:流水线中的空闲时间
bubble_ratio = (num_stages - 1) / (num_microbatches + num_stages - 1)
return bubble_ratio
# 减少气泡
def reduce_bubble(num_microbatches, num_stages):
"""减少气泡的策略"""
strategies = {
"增加microbatch": "num_microbatches >> num_stages",
"虚拟流水线": "将阶段进一步细分",
"交错调度": "1F1B调度减少气泡"
}
return strategies
内存优化
# 激活值检查点
class CheckpointedPipelineStage(nn.Module):
"""带检查点的流水线阶段"""
def __init__(self, layers):
super().__init__()
self.layers = nn.ModuleList(layers)
def forward(self, x):
# 使用检查点减少激活内存
for layer in self.layers:
x = torch.utils.checkpoint.checkpoint(
layer, x, use_reentrant=False
)
return x
# 内存估算
def estimate_pipeline_memory(model_size, num_stages, num_microbatches):
"""估算流水线并行内存"""
per_stage_memory = model_size / num_stages
# 激活内存
activation_memory = per_stage_memory * num_microbatches * 0.1
total_memory = per_stage_memory + activation_memory
return {
"模型内存": per_stage_memory,
"激活内存": activation_memory,
"总内存": total_memory
}
性能优化
# 流水线并行性能
performance = {
"GPipe": {
"优点": "简单易实现",
"缺点": "气泡大",
"适用": "小规模训练"
},
"1F1B": {
"优点": "气泡小",
"缺点": "实现复杂",
"适用": "大规模训练"
},
"Interleaved": {
"优点": "气泡最小",
"缺点": "通信复杂",
"适用": "超大规模训练"
}
}
与其他并行结合
# 3D并行:张量 + 流水线 + 数据
def setup_3d_parallel():
"""设置3D并行"""
from megatron.core import parallel_state
tp_size = 2 # 张量并行
pp_size = 4 # 流水线并行
dp_size = world_size // (tp_size * pp_size) # 数据并行
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=tp_size,
pipeline_model_parallel_size=pp_size,
virtual_pipeline_model_parallel_size=None
)
return tp_size, pp_size, dp_size
性能基准
# 流水线并行性能
benchmark = {
"4 GPU": {
"加速比": "3.2x",
"气泡比例": "15%",
"通信开销": "5%"
},
"8 GPU": {
"加速比": "6.1x",
"气泡比例": "12%",
"通信开销": "8%"
},
"16 GPU": {
"加速比": "11.5x",
"气泡比例": "10%",
"通信开销": "12%"
}
}
最佳实践
- microbatch数量:设置为流水线阶段数的2-4倍
- 调度策略:优先使用1F1B调度
- 通信优化:使用点对点通信
- 内存管理:启用激活检查点
- 负载均衡:确保各阶段计算量均衡
流水线并行是训练深层大模型的有效技术,与张量并行和数据并行结合可以实现超大规模训练。