← 返回首页
🧠

数据并行:样本级并行计算

📂 llm ⏱ 3 min 470 words

--- title: "数据并行:样本级并行计算" description: "掌握数据并行的原理和实现,包括DDP、FSDP和ZeRO等技术" tags: ["数据并行", "DDP", "FSDP", "ZeRO"] category: "llm" icon: "🧠"

数据并行:样本级并行计算

数据并行简介

数据并行(Data Parallelism)是最简单的分布式训练策略,每个GPU持有完整的模型副本,但处理不同的数据批次。数据并行通过在GPU间同步梯度来保持模型一致性。

数据并行的核心优势:

基本实现

DDP(DistributedDataParallel)

import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler

def setup(rank, world_size):
    """初始化进程组"""
    dist.init_process_group(
        backend="nccl",
        rank=rank,
        world_size=world_size
    )
    torch.cuda.set_device(rank)

def train(rank, world_size):
    """DDP训练"""
    setup(rank, world_size)
    
    # 创建模型
    model = MyModel().to(rank)
    
    # DDP包装
    model = DDP(model, device_ids=[rank])
    
    # 数据采样器
    sampler = DistributedSampler(
        dataset,
        num_replicas=world_size,
        rank=rank,
        shuffle=True
    )
    
    dataloader = DataLoader(
        dataset,
        batch_size=32,
        sampler=sampler,
        num_workers=4
    )
    
    optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)
    
    # 训练循环
    for epoch in range(num_epochs):
        sampler.set_epoch(epoch)  # 保证每epoch shuffle不同
        
        for batch in dataloader:
            batch = {k: v.to(rank) for k, v in batch.items()}
            
            outputs = model(**batch)
            loss = outputs.loss
            
            loss.backward()  # DDP自动同步梯度
            
            optimizer.step()
            optimizer.zero_grad()
    
    dist.destroy_process_group()

FSDP(FullyShardedDataParallel)

from torch.distributed.fsdp import (
    FullyShardedDataParallel as FSDP,
    MixedPrecision,
    ShardingStrategy
)
from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy

# FSDP配置
fsdp_config = {
    "sharding_strategy": ShardingStrategy.FULL_SHARD,
    "mixed_precision": MixedPrecision(
        param_dtype=torch.float16,
        reduce_dtype=torch.float16,
        buffer_dtype=torch.float16
    ),
    "auto_wrap_policy": transformer_auto_wrap_policy
}

# FSDP包装
model = FSDP(
    model,
    **fsdp_config
)

ZeRO优化

ZeRO Stage 1

import deepspeed

# ZeRO-1配置
zero_1_config = {
    "zero_optimization": {
        "stage": 1
    },
    "fp16": {"enabled": True},
    "train_batch_size": 32,
    "train_micro_batch_size_per_gpu": 4
}

# 初始化
model_engine, optimizer, _, _ = deepspeed.initialize(
    model=model,
    config=zero_1_config
)

ZeRO Stage 2

# ZeRO-2配置
zero_2_config = {
    "zero_optimization": {
        "stage": 2,
        "overlap_comm": True,
        "contiguous_gradients": True,
        "reduce_bucket_size": 5e8,
        "allgather_bucket_size": 5e8
    },
    "fp16": {"enabled": True}
}

ZeRO Stage 3

# ZeRO-3配置
zero_3_config = {
    "zero_optimization": {
        "stage": 3,
        "offload_optimizer": {"device": "cpu"},
        "offload_param": {"device": "cpu"},
        "overlap_comm": True,
        "contiguous_gradients": True,
        "reduce_bucket_size": 5e8,
        "stage3_prefetch_bucket_size": 5e8,
        "stage3_param_persistence_threshold": 1e6
    },
    "fp16": {"enabled": True}
}

内存分析

def analyze_data_parallel_memory(model_size, batch_size, world_size):
    """分析数据并行内存"""
    # 基础内存
    model_memory = model_size  # 每个GPU都存储完整模型
    
    # 优化器状态内存(Adam需要32 bytes/param)
    optimizer_memory = model_size * 12  # FP32参数 + 一阶矩 + 二阶矩
    
    # 梯度内存
    gradient_memory = model_size * 2  # FP16
    
    # 激活内存(与batch_size相关)
    activation_memory = batch_size * 0.1  # 估算
    
    total_per_gpu = model_memory + optimizer_memory + gradient_memory + activation_memory
    
    return {
        "模型内存": model_memory / 1024**3,
        "优化器内存": optimizer_memory / 1024**3,
        "梯度内存": gradient_memory / 1024**3,
        "激活内存": activation_memory / 1024**3,
        "总内存/GPU": total_per_gpu / 1024**3
    }

# ZeRO内存节省
zero_savings = {
    "ZeRO-1": "优化器状态 / world_size",
    "ZeRO-2": "(优化器状态 + 梯度) / world_size",
    "ZeRO-3": "(模型 + 优化器 + 梯度) / world_size"
}

通信优化

# 梯度同步策略
sync_strategies = {
    "AllReduce": "所有GPU归约梯度",
    "ReduceScatter": "归约并分发",
    "AllGather": "收集参数更新"
}

# Overlap通信与计算
class OverlapDDP(DDP):
    """重叠通信与计算"""
    
    def forward(self, *args, **kwargs):
        # 启动异步AllReduce
        self._async_all_reduce_start()
        
        # 执行前向传播
        output = super().forward(*args, **kwargs)
        
        # 等待AllReduce完成
        self._async_all_reduce_wait()
        
        return output

性能对比

# 数据并行方法对比
comparison = {
    "DDP": {
        "内存效率": "低(每GPU完整模型)",
        "通信量": "AllReduce梯度",
        "适用": "模型能放入单GPU"
    },
    "FSDP": {
        "内存效率": "高(全分片)",
        "通信量": "AllGather + ReduceScatter",
        "适用": "大模型训练"
    },
    "ZeRO-1": {
        "内存效率": "中",
        "通信量": "与DDP相同",
        "适用": "优化器状态大"
    },
    "ZeRO-2": {
        "内存效率": "较高",
        "通信量": "略高于DDP",
        "适用": "中等规模训练"
    },
    "ZeRO-3": {
        "内存效率": "最高",
        "通信量": "较高",
        "适用": "超大模型训练"
    }
}

性能基准

# 数据并行性能
benchmark = {
    "2 GPU": {
        "DDP加速比": "1.85x",
        "FSDP加速比": "1.7x"
    },
    "4 GPU": {
        "DDP加速比": "3.5x",
        "FSDP加速比": "3.2x"
    },
    "8 GPU": {
        "DDP加速比": "6.8x",
        "FSDP加速比": "6.3x"
    }
}

最佳实践

  1. 小模型:使用DDP
  2. 大模型:使用FSDP或ZeRO
  3. 超大模型:使用ZeRO-3 + CPU Offload
  4. 通信优化:启用overlap和gradient bucketing
  5. 批量大小:平衡吞吐量和内存

数据并行是最基础的分布式训练策略,适用于大多数场景。