分布式训练:并行化策略详解
--- title: "分布式训练:并行化策略详解" description: "掌握分布式训练的核心概念、并行策略和实现方法" tags: ["分布式训练", "并行计算", "数据并行", "模型并行"] category: "llm" icon: "🧠"
分布式训练:并行化策略详解
分布式训练概述
分布式训练是将训练任务分布到多个计算设备上的技术。对于大语言模型,分布式训练是必不可少的,因为单个GPU无法容纳完整的模型和数据。
分布式训练的核心目标:
- 加速训练:利用多GPU并行计算
- 扩展规模:训练更大的模型
- 优化内存:分摊模型和数据内存
- 提高效率:最大化硬件利用率
并行策略
数据并行(Data Parallelism)
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def setup_data_parallel(rank, world_size):
"""设置数据并行"""
dist.init_process_group(
backend="nccl",
rank=rank,
world_size=world_size
)
torch.cuda.set_device(rank)
# 创建模型
model = create_model()
model = model.to(rank)
# DDP包装
model = DDP(model, device_ids=[rank])
return model
def train_epoch(model, dataloader, optimizer, rank):
"""数据并行训练"""
model.train()
for batch in dataloader:
# 每个GPU处理不同的数据
batch = {k: v.to(rank) for k, v in batch.items()}
outputs = model(**batch)
loss = outputs.loss
loss.backward() # 自动同步梯度
optimizer.step()
optimizer.zero_grad()
张量并行(Tensor Parallelism)
# 张量并行:将单个层的权重矩阵分割到多个GPU
from megatron.core.tensor_parallel import ColumnParallelLinear, RowParallelLinear
class TensorParallelMLP(nn.Module):
def __init__(self, hidden_size, intermediate_size):
super().__init__()
# 列并行:分割输出维度
self.dense_h_to_4h = ColumnParallelLinear(
hidden_size,
intermediate_size,
gather_output=False # 不收集输出
)
# 行并行:分割输入维度
self.dense_4h_to_h = RowParallelLinear(
intermediate_size,
hidden_size,
input_is_parallel=True # 输入已并行
)
def forward(self, x):
# 自动处理通信(AllReduce)
x = self.dense_h_to_4h(x)
x = F.silu(x)
x = self.dense_4h_to_h(x)
return x
流水线并行(Pipeline Parallelism)
# 流水线并行:将模型层分割到多个GPU
class PipelineParallelModel(nn.Module):
def __init__(self, layers_per_stage):
super().__init__()
# 将层分配到不同GPU
self.stages = nn.ModuleList([
nn.Sequential(*create_layers(i))
for i in range(num_stages)
])
def forward(self, x):
for stage in self.stages:
x = stage(x)
# 需要跨GPU传输
return x
# GPipe调度
def gpipe_schedule(model, microbatches):
"""GPipe流水线调度"""
num_microbatches = len(microbatches)
num_stages = len(model.stages)
# 前向传播
for i in range(num_microbatches):
for j in range(num_stages):
# 处理微批次
pass
# 反向传播
for i in range(num_microbatches - 1, -1, -1):
for j in range(num_stages - 1, -1, -1):
pass
序列并行(Sequence Parallelism)
# 序列并行:将序列维度分割到多个GPU
def sequence_parallel_forward(x, local_rank):
"""序列并行前向传播"""
batch_size, seq_len, hidden_size = x.shape
# 将序列分割到多个GPU
chunks = torch.chunk(x, world_size, dim=1)
local_x = chunks[local_rank]
# 本地计算
output = local_model(local_x)
# 收集结果
outputs = [torch.zeros_like(output) for _ in range(world_size)]
dist.all_gather(outputs, output)
return torch.cat(outputs, dim=1)
混合并行
# 3D并行:数据并行 + 张量并行 + 流水线并行
def setup_hybrid_parallel():
"""设置混合并行"""
from megatron.core import parallel_state
# 初始化并行组
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=2, # 张量并行
pipeline_model_parallel_size=4, # 流水线并行
virtual_pipeline_model_parallel_size=None
)
# 获取各并行组
tp_group = parallel_state.get_tensor_model_parallel_group()
pp_group = parallel_state.get_pipeline_model_parallel_group()
dp_group = parallel_state.get_data_parallel_group()
return tp_group, pp_group, dp_group
内存优化
ZeRO优化
# DeepSpeed ZeRO配置
zero_config = {
"stage": 3,
"offload_optimizer": {"device": "cpu"},
"offload_param": {"device": "cpu"},
"overlap_comm": True,
"contiguous_gradients": True,
"reduce_bucket_size": 5e8,
"stage3_prefetch_bucket_size": 5e8,
"stage3_param_persistence_threshold": 1e6
}
# 内存节省
memory_savings = {
"ZeRO-1": "4x (优化器状态分片)",
"ZeRO-2": "8x (优化器+梯度分片)",
"ZeRO-3": "线性 (全分片)"
}
梯度检查点
# 激活检查点(Activation Checkpointing)
from torch.utils.checkpoint import checkpoint
class CheckpointedModel(nn.Module):
def __init__(self, layers):
super().__init__()
self.layers = layers
def forward(self, x):
for layer in self.layers:
# 使用检查点减少激活内存
x = checkpoint(layer, x, use_reentrant=False)
return x
性能监控
import time
import torch.distributed as dist
class DistributedProfiler:
"""分布式性能分析器"""
def __init__(self):
self.timers = {}
def start(self, name):
if name not in self.timers:
self.timers[name] = []
self.timers[name].append(time.time())
def stop(self, name):
if self.timers[name]:
start_time = self.timers[name].pop()
elapsed = time.time() - start_time
return elapsed
return 0
def all_reduce_stats(self):
"""汇总所有GPU的统计信息"""
stats = {}
for name, times in self.timers.items():
avg_time = sum(times) / len(times)
stats[name] = avg_time
return stats
最佳实践
- 选择策略:小模型用数据并行,大模型用混合并行
- 通信优化:启用overlap通信
- 内存管理:使用ZeRO和激活检查点
- 批量大小:平衡吞吐量和内存
- 监控性能:使用NVIDIA NSight分析瓶颈
分布式训练是大模型训练的基石,掌握各种并行策略是LLM开发的必备技能。