← 返回首页
🧠

张量并行:层内并行计算

📂 llm ⏱ 3 min 477 words

--- title: "张量并行:层内并行计算" description: "掌握张量并行的原理和实现,实现单层内的多GPU并行" tags: ["张量并行", "层内并行", "矩阵分割", "通信优化"] category: "llm" icon: "🧠"

张量并行:层内并行计算

张量并行简介

张量并行(Tensor Parallelism)是一种将单个神经网络层的权重矩阵分割到多个GPU上进行并行计算的技术。与数据并行不同,张量并行在层内进行分割,可以显著减少单GPU的内存占用。

张量并行的核心优势:

工作原理

列并行

import torch
import torch.nn as nn
import torch.distributed as dist

class ColumnParallelLinear(nn.Module):
    """列并行:分割输出维度"""
    
    def __init__(self, in_features, out_features, world_size, rank):
        super().__init__()
        assert out_features % world_size == 0
        
        self.world_size = world_size
        self.rank = rank
        self.local_out_features = out_features // world_size
        
        # 每个GPU只存储1/world_size的权重
        self.weight = nn.Parameter(
            torch.randn(self.local_out_features, in_features)
        )
        self.bias = nn.Parameter(torch.zeros(self.local_out_features))
    
    def forward(self, x):
        # 本地矩阵乘法
        local_output = torch.nn.functional.linear(x, self.weight, self.bias)
        
        # 收集所有GPU的输出(AllGather)
        if self.world_size > 1:
            outputs = [torch.zeros_like(local_output) for _ in range(self.world_size)]
            dist.all_gather(outputs, local_output)
            output = torch.cat(outputs, dim=-1)
        else:
            output = local_output
        
        return output

行并行

class RowParallelLinear(nn.Module):
    """行并行:分割输入维度"""
    
    def __init__(self, in_features, out_features, world_size, rank):
        super().__init__()
        assert in_features % world_size == 0
        
        self.world_size = world_size
        self.rank = rank
        self.local_in_features = in_features // world_size
        
        # 每个GPU只存储1/world_size的权重
        self.weight = nn.Parameter(
            torch.randn(out_features, self.local_in_features)
        )
        self.bias = nn.Parameter(torch.zeros(out_features))
    
    def forward(self, x):
        # 本地矩阵乘法
        local_output = torch.nn.functional.linear(x, self.weight)
        
        # 归约所有GPU的输出(AllReduce)
        if self.world_size > 1:
            dist.all_reduce(local_output, op=dist.ReduceOp.SUM)
        
        return local_output + self.bias

Transformer层并行

class TensorParallelTransformerLayer(nn.Module):
    """张量并行Transformer层"""
    
    def __init__(self, hidden_size, num_heads, world_size, rank):
        super().__init__()
        self.world_size = world_size
        self.rank = rank
        
        # 注意力层:QKV列并行,输出行并行
        self.qkv_proj = ColumnParallelLinear(
            hidden_size, 3 * hidden_size, world_size, rank
        )
        self.o_proj = RowParallelLinear(
            hidden_size, hidden_size, world_size, rank
        )
        
        # FFN层:第一个线性层列并行,第二个行并行
        self.gate_proj = ColumnParallelLinear(
            hidden_size, 4 * hidden_size, world_size, rank
        )
        self.up_proj = ColumnParallelLinear(
            hidden_size, 4 * hidden_size, world_size, rank
        )
        self.down_proj = RowParallelLinear(
            4 * hidden_size, hidden_size, world_size, rank
        )
    
    def forward(self, x, mask=None):
        # 注意力
        qkv = self.qkv_proj(x)
        q, k, v = qkv.chunk(3, dim=-1)
        attn_output = self._attention(q, k, v, mask)
        x = x + self.o_proj(attn_output)
        
        # FFN
        gate = torch.nn.functional.silu(self.gate_proj(x))
        up = self.up_proj(x)
        x = x + self.down_proj(gate * up)
        
        return x

通信模式

# 通信原语
communication_primitives = {
    "AllGather": "收集所有GPU的输出",
    "AllReduce": "归约所有GPU的梯度",
    "ReduceScatter": "归约并分发",
    "P2P": "点对点通信(流水线并行)"
}

# 通信量计算
def calculate_communication_volume(tensor_size, world_size):
    """计算通信量"""
    # AllGather
    allgather_volume = tensor_size * (world_size - 1)
    
    # AllReduce(Ring算法)
    allreduce_volume = 2 * tensor_size * (world_size - 1) / world_size
    
    return {
        "AllGather": allgather_volume,
        "AllReduce": allreduce_volume
    }

使用Megatron-LM

# Megatron张量并行配置
from megatron.core import parallel_state

# 初始化
parallel_state.initialize_model_parallel(
    tensor_model_parallel_size=4,  # 使用4个GPU
    pipeline_model_parallel_size=1,
    virtual_pipeline_model_parallel_size=None
)

# 获取并行组
tp_group = parallel_state.get_tensor_model_parallel_group()
tp_rank = parallel_state.get_tensor_model_parallel_rank()
tp_world_size = parallel_state.get_tensor_model_parallel_world_size()

# 使用并行层
from megatron.core.tensor_parallel import ColumnParallelLinear, RowParallelLinear

layer = ColumnParallelLinear(4096, 11008, gather_output=False)

性能优化

通信优化

# 通信优化策略
optimization_strategies = {
    "Overlap": "计算与通信重叠",
    "Fusion": "融合小通信操作",
    "Compression": "梯度压缩",
    "Quantization": "通信量化"
}

# Overlap示例
class OverlapParallel(nn.Module):
    def forward(self, x):
        # 启动异步通信
        handle = dist.all_reduce_async(x)
        
        # 同时进行计算
        output = self.compute(x)
        
        # 等待通信完成
        handle.wait()
        
        return output

负载均衡

def ensure_load_balance(model, world_size):
    """确保负载均衡"""
    # 检查各GPU的计算量
    gpu_loads = [0] * world_size
    
    for name, param in model.named_parameters():
        # 分配参数到负载最小的GPU
        min_load_gpu = gpu_loads.index(min(gpu_loads))
        assign_parameter(param, min_load_gpu)
        gpu_loads[min_load_gpu] += param.numel()
    
    return gpu_loads

与其他并行结合

# 张量并行 + 数据并行
def setup_tensor_data_parallel():
    """设置张量并行 + 数据并行"""
    from megatron.core import parallel_state
    
    # 张量并行组
    tp_size = 2
    pp_size = 1
    dp_size = world_size // (tp_size * pp_size)
    
    parallel_state.initialize_model_parallel(
        tensor_model_parallel_size=tp_size,
        pipeline_model_parallel_size=pp_size,
        virtual_pipeline_model_parallel_size=None
    )
    
    # 数据并行组自动创建
    dp_group = parallel_state.get_data_parallel_group()
    
    return tp_group, dp_group

性能基准

# 张量并行性能
performance = {
    "2 GPU": {
        "加速比": "1.8x",
        "通信开销": "10%"
    },
    "4 GPU": {
        "加速比": "3.2x",
        "通信开销": "15%"
    },
    "8 GPU": {
        "加速比": "5.8x",
        "通信开销": "25%"
    }
}

最佳实践

  1. GPU选择:使用NVLink连接的GPU
  2. 通信组:合理划分张量并行和数据并行
  3. 负载均衡:确保各GPU计算量均衡
  4. 通信优化:启用overlap和fusion
  5. 监控性能:使用NVIDIA NSight分析

张量并行是训练超大模型的关键技术,可以显著减少单GPU的内存和计算负担。