张量并行:层内并行计算
--- title: "张量并行:层内并行计算" description: "掌握张量并行的原理和实现,实现单层内的多GPU并行" tags: ["张量并行", "层内并行", "矩阵分割", "通信优化"] category: "llm" icon: "🧠"
张量并行:层内并行计算
张量并行简介
张量并行(Tensor Parallelism)是一种将单个神经网络层的权重矩阵分割到多个GPU上进行并行计算的技术。与数据并行不同,张量并行在层内进行分割,可以显著减少单GPU的内存占用。
张量并行的核心优势:
- 内存优化:权重矩阵分割到多个GPU
- 计算并行:矩阵运算可以并行执行
- 通信高效:只需在层边界进行通信
- 适合大模型:特别适合非常大的模型
工作原理
列并行
import torch
import torch.nn as nn
import torch.distributed as dist
class ColumnParallelLinear(nn.Module):
"""列并行:分割输出维度"""
def __init__(self, in_features, out_features, world_size, rank):
super().__init__()
assert out_features % world_size == 0
self.world_size = world_size
self.rank = rank
self.local_out_features = out_features // world_size
# 每个GPU只存储1/world_size的权重
self.weight = nn.Parameter(
torch.randn(self.local_out_features, in_features)
)
self.bias = nn.Parameter(torch.zeros(self.local_out_features))
def forward(self, x):
# 本地矩阵乘法
local_output = torch.nn.functional.linear(x, self.weight, self.bias)
# 收集所有GPU的输出(AllGather)
if self.world_size > 1:
outputs = [torch.zeros_like(local_output) for _ in range(self.world_size)]
dist.all_gather(outputs, local_output)
output = torch.cat(outputs, dim=-1)
else:
output = local_output
return output
行并行
class RowParallelLinear(nn.Module):
"""行并行:分割输入维度"""
def __init__(self, in_features, out_features, world_size, rank):
super().__init__()
assert in_features % world_size == 0
self.world_size = world_size
self.rank = rank
self.local_in_features = in_features // world_size
# 每个GPU只存储1/world_size的权重
self.weight = nn.Parameter(
torch.randn(out_features, self.local_in_features)
)
self.bias = nn.Parameter(torch.zeros(out_features))
def forward(self, x):
# 本地矩阵乘法
local_output = torch.nn.functional.linear(x, self.weight)
# 归约所有GPU的输出(AllReduce)
if self.world_size > 1:
dist.all_reduce(local_output, op=dist.ReduceOp.SUM)
return local_output + self.bias
Transformer层并行
class TensorParallelTransformerLayer(nn.Module):
"""张量并行Transformer层"""
def __init__(self, hidden_size, num_heads, world_size, rank):
super().__init__()
self.world_size = world_size
self.rank = rank
# 注意力层:QKV列并行,输出行并行
self.qkv_proj = ColumnParallelLinear(
hidden_size, 3 * hidden_size, world_size, rank
)
self.o_proj = RowParallelLinear(
hidden_size, hidden_size, world_size, rank
)
# FFN层:第一个线性层列并行,第二个行并行
self.gate_proj = ColumnParallelLinear(
hidden_size, 4 * hidden_size, world_size, rank
)
self.up_proj = ColumnParallelLinear(
hidden_size, 4 * hidden_size, world_size, rank
)
self.down_proj = RowParallelLinear(
4 * hidden_size, hidden_size, world_size, rank
)
def forward(self, x, mask=None):
# 注意力
qkv = self.qkv_proj(x)
q, k, v = qkv.chunk(3, dim=-1)
attn_output = self._attention(q, k, v, mask)
x = x + self.o_proj(attn_output)
# FFN
gate = torch.nn.functional.silu(self.gate_proj(x))
up = self.up_proj(x)
x = x + self.down_proj(gate * up)
return x
通信模式
# 通信原语
communication_primitives = {
"AllGather": "收集所有GPU的输出",
"AllReduce": "归约所有GPU的梯度",
"ReduceScatter": "归约并分发",
"P2P": "点对点通信(流水线并行)"
}
# 通信量计算
def calculate_communication_volume(tensor_size, world_size):
"""计算通信量"""
# AllGather
allgather_volume = tensor_size * (world_size - 1)
# AllReduce(Ring算法)
allreduce_volume = 2 * tensor_size * (world_size - 1) / world_size
return {
"AllGather": allgather_volume,
"AllReduce": allreduce_volume
}
使用Megatron-LM
# Megatron张量并行配置
from megatron.core import parallel_state
# 初始化
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=4, # 使用4个GPU
pipeline_model_parallel_size=1,
virtual_pipeline_model_parallel_size=None
)
# 获取并行组
tp_group = parallel_state.get_tensor_model_parallel_group()
tp_rank = parallel_state.get_tensor_model_parallel_rank()
tp_world_size = parallel_state.get_tensor_model_parallel_world_size()
# 使用并行层
from megatron.core.tensor_parallel import ColumnParallelLinear, RowParallelLinear
layer = ColumnParallelLinear(4096, 11008, gather_output=False)
性能优化
通信优化
# 通信优化策略
optimization_strategies = {
"Overlap": "计算与通信重叠",
"Fusion": "融合小通信操作",
"Compression": "梯度压缩",
"Quantization": "通信量化"
}
# Overlap示例
class OverlapParallel(nn.Module):
def forward(self, x):
# 启动异步通信
handle = dist.all_reduce_async(x)
# 同时进行计算
output = self.compute(x)
# 等待通信完成
handle.wait()
return output
负载均衡
def ensure_load_balance(model, world_size):
"""确保负载均衡"""
# 检查各GPU的计算量
gpu_loads = [0] * world_size
for name, param in model.named_parameters():
# 分配参数到负载最小的GPU
min_load_gpu = gpu_loads.index(min(gpu_loads))
assign_parameter(param, min_load_gpu)
gpu_loads[min_load_gpu] += param.numel()
return gpu_loads
与其他并行结合
# 张量并行 + 数据并行
def setup_tensor_data_parallel():
"""设置张量并行 + 数据并行"""
from megatron.core import parallel_state
# 张量并行组
tp_size = 2
pp_size = 1
dp_size = world_size // (tp_size * pp_size)
parallel_state.initialize_model_parallel(
tensor_model_parallel_size=tp_size,
pipeline_model_parallel_size=pp_size,
virtual_pipeline_model_parallel_size=None
)
# 数据并行组自动创建
dp_group = parallel_state.get_data_parallel_group()
return tp_group, dp_group
性能基准
# 张量并行性能
performance = {
"2 GPU": {
"加速比": "1.8x",
"通信开销": "10%"
},
"4 GPU": {
"加速比": "3.2x",
"通信开销": "15%"
},
"8 GPU": {
"加速比": "5.8x",
"通信开销": "25%"
}
}
最佳实践
- GPU选择:使用NVLink连接的GPU
- 通信组:合理划分张量并行和数据并行
- 负载均衡:确保各GPU计算量均衡
- 通信优化:启用overlap和fusion
- 监控性能:使用NVIDIA NSight分析
张量并行是训练超大模型的关键技术,可以显著减少单GPU的内存和计算负担。