数据并行:样本级并行计算
--- title: "数据并行:样本级并行计算" description: "掌握数据并行的原理和实现,包括DDP、FSDP和ZeRO等技术" tags: ["数据并行", "DDP", "FSDP", "ZeRO"] category: "llm" icon: "🧠"
数据并行:样本级并行计算
数据并行简介
数据并行(Data Parallelism)是最简单的分布式训练策略,每个GPU持有完整的模型副本,但处理不同的数据批次。数据并行通过在GPU间同步梯度来保持模型一致性。
数据并行的核心优势:
- 实现简单:易于理解和实现
- 线性加速:接近线性的多GPU加速
- 通信高效:只需同步梯度
- 适合大数据:数据量大时效果好
基本实现
DDP(DistributedDataParallel)
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
def setup(rank, world_size):
"""初始化进程组"""
dist.init_process_group(
backend="nccl",
rank=rank,
world_size=world_size
)
torch.cuda.set_device(rank)
def train(rank, world_size):
"""DDP训练"""
setup(rank, world_size)
# 创建模型
model = MyModel().to(rank)
# DDP包装
model = DDP(model, device_ids=[rank])
# 数据采样器
sampler = DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank,
shuffle=True
)
dataloader = DataLoader(
dataset,
batch_size=32,
sampler=sampler,
num_workers=4
)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
# 训练循环
for epoch in range(num_epochs):
sampler.set_epoch(epoch) # 保证每epoch shuffle不同
for batch in dataloader:
batch = {k: v.to(rank) for k, v in batch.items()}
outputs = model(**batch)
loss = outputs.loss
loss.backward() # DDP自动同步梯度
optimizer.step()
optimizer.zero_grad()
dist.destroy_process_group()
FSDP(FullyShardedDataParallel)
from torch.distributed.fsdp import (
FullyShardedDataParallel as FSDP,
MixedPrecision,
ShardingStrategy
)
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
# FSDP配置
fsdp_config = {
"sharding_strategy": ShardingStrategy.FULL_SHARD,
"mixed_precision": MixedPrecision(
param_dtype=torch.float16,
reduce_dtype=torch.float16,
buffer_dtype=torch.float16
),
"auto_wrap_policy": transformer_auto_wrap_policy
}
# FSDP包装
model = FSDP(
model,
**fsdp_config
)
ZeRO优化
ZeRO Stage 1
import deepspeed
# ZeRO-1配置
zero_1_config = {
"zero_optimization": {
"stage": 1
},
"fp16": {"enabled": True},
"train_batch_size": 32,
"train_micro_batch_size_per_gpu": 4
}
# 初始化
model_engine, optimizer, _, _ = deepspeed.initialize(
model=model,
config=zero_1_config
)
ZeRO Stage 2
# ZeRO-2配置
zero_2_config = {
"zero_optimization": {
"stage": 2,
"overlap_comm": True,
"contiguous_gradients": True,
"reduce_bucket_size": 5e8,
"allgather_bucket_size": 5e8
},
"fp16": {"enabled": True}
}
ZeRO Stage 3
# ZeRO-3配置
zero_3_config = {
"zero_optimization": {
"stage": 3,
"offload_optimizer": {"device": "cpu"},
"offload_param": {"device": "cpu"},
"overlap_comm": True,
"contiguous_gradients": True,
"reduce_bucket_size": 5e8,
"stage3_prefetch_bucket_size": 5e8,
"stage3_param_persistence_threshold": 1e6
},
"fp16": {"enabled": True}
}
内存分析
def analyze_data_parallel_memory(model_size, batch_size, world_size):
"""分析数据并行内存"""
# 基础内存
model_memory = model_size # 每个GPU都存储完整模型
# 优化器状态内存(Adam需要32 bytes/param)
optimizer_memory = model_size * 12 # FP32参数 + 一阶矩 + 二阶矩
# 梯度内存
gradient_memory = model_size * 2 # FP16
# 激活内存(与batch_size相关)
activation_memory = batch_size * 0.1 # 估算
total_per_gpu = model_memory + optimizer_memory + gradient_memory + activation_memory
return {
"模型内存": model_memory / 1024**3,
"优化器内存": optimizer_memory / 1024**3,
"梯度内存": gradient_memory / 1024**3,
"激活内存": activation_memory / 1024**3,
"总内存/GPU": total_per_gpu / 1024**3
}
# ZeRO内存节省
zero_savings = {
"ZeRO-1": "优化器状态 / world_size",
"ZeRO-2": "(优化器状态 + 梯度) / world_size",
"ZeRO-3": "(模型 + 优化器 + 梯度) / world_size"
}
通信优化
# 梯度同步策略
sync_strategies = {
"AllReduce": "所有GPU归约梯度",
"ReduceScatter": "归约并分发",
"AllGather": "收集参数更新"
}
# Overlap通信与计算
class OverlapDDP(DDP):
"""重叠通信与计算"""
def forward(self, *args, **kwargs):
# 启动异步AllReduce
self._async_all_reduce_start()
# 执行前向传播
output = super().forward(*args, **kwargs)
# 等待AllReduce完成
self._async_all_reduce_wait()
return output
性能对比
# 数据并行方法对比
comparison = {
"DDP": {
"内存效率": "低(每GPU完整模型)",
"通信量": "AllReduce梯度",
"适用": "模型能放入单GPU"
},
"FSDP": {
"内存效率": "高(全分片)",
"通信量": "AllGather + ReduceScatter",
"适用": "大模型训练"
},
"ZeRO-1": {
"内存效率": "中",
"通信量": "与DDP相同",
"适用": "优化器状态大"
},
"ZeRO-2": {
"内存效率": "较高",
"通信量": "略高于DDP",
"适用": "中等规模训练"
},
"ZeRO-3": {
"内存效率": "最高",
"通信量": "较高",
"适用": "超大模型训练"
}
}
性能基准
# 数据并行性能
benchmark = {
"2 GPU": {
"DDP加速比": "1.85x",
"FSDP加速比": "1.7x"
},
"4 GPU": {
"DDP加速比": "3.5x",
"FSDP加速比": "3.2x"
},
"8 GPU": {
"DDP加速比": "6.8x",
"FSDP加速比": "6.3x"
}
}
最佳实践
- 小模型:使用DDP
- 大模型:使用FSDP或ZeRO
- 超大模型:使用ZeRO-3 + CPU Offload
- 通信优化:启用overlap和gradient bucketing
- 批量大小:平衡吞吐量和内存
数据并行是最基础的分布式训练策略,适用于大多数场景。