GPU利用率优化
--- title: "GPU利用率优化" description: "深入探讨GPU利用率优化技术,包括CUDA编程、显存管理和计算优化策略,帮助提升大模型推理和训练效率。" tags: ["GPU利用率", "CUDA", "显存优化", "计算优化"] category: "llm" icon: "🧠"
GPU利用率优化
什么是GPU利用率
GPU利用率是衡量GPU计算资源被有效使用程度的指标。在大模型场景下,低利用率意味着算力浪费和成本增加。优化GPU利用率是降低推理延迟、提升吞吐量的关键手段。
GPU利用率的核心指标
SM利用率
SM(Streaming Multiprocessor)利用率反映GPU上活跃计算单元的比例:
import torch
def measure_sm_utilization():
torch.cuda.reset_peak_memory_stats()
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)
start.record()
# 执行计算密集型任务
x = torch.randn(4096, 4096, device='cuda')
y = torch.mm(x, x.t())
end.record()
torch.cuda.synchronize()
print(f"执行时间: {start.elapsed_time(end):.2f}ms")
measure_sm_utilization()
显存带宽利用率
大模型推理通常是显存带宽瓶颈(Memory-bound),而非计算瓶颈:
def calculate_memory_bandwidth():
# A100 80GB 显存带宽约 2TB/s
bandwidth = 2000 # GB/s
model_size = 70 # GB (70B参数模型)
# 单次前向传播至少需要加载全部参数
min_latency = model_size / bandwidth # 秒
print(f"70B模型理论最小延迟: {min_latency*1000:.1f}ms")
# 实际受batch size影响
batch_sizes = [1, 8, 32, 128]
for bs in batch_sizes:
throughput = bs / min_latency
print(f"Batch {bs}: 吞吐量 {throughput:.0f} tokens/s")
calculate_memory_bandwidth()
显存优化技术
显存池化
避免频繁的cudaMalloc/cudaFree操作:
import torch
from torch.cuda import memory
class MemoryPool:
def __init__(self, max_pool_size=1024*1024*1024):
self.pool = {}
self.max_pool_size = max_pool_size
def allocate(self, shape, dtype=torch.float16):
key = (shape, dtype)
if key in self.pool and len(self.pool[key]) > 0:
return self.pool[key].pop()
return torch.empty(shape, dtype=dtype, device='cuda')
def release(self, tensor):
key = (tensor.shape, tensor.dtype)
if key not in self.pool:
self.pool[key] = []
self.pool[key].append(tensor)
def clear(self):
self.pool.clear()
torch.cuda.empty_cache()
pool = MemoryPool()
# 重用显存块,减少碎片
tensor1 = pool.allocate((1024, 1024))
pool.release(tensor1)
tensor2 = pool.allocate((1024, 1024)) # 重用同一块显存
显存碎片整理
def defragment_memory():
"""通过预分配大块显存并逐步切割使用来减少碎片"""
# 预分配一块连续显存
large_block = torch.empty(1024 * 1024, dtype=torch.float16, device='cuda')
# 逻辑上切割使用
chunk_size = 1024
chunks = []
for i in range(1024):
chunk = large_block[i*chunk_size:(i+1)*chunk_size]
chunks.append(chunk)
# 使用完毕后整体释放
del large_block
torch.cuda.empty_cache()
计算优化策略
Kernel融合
将多个小kernel合并为一个大kernel,减少kernel launch开销:
import torch
from torch import fx
@torch.compile
def fused_attention(Q, K, V, scale):
"""融合注意力计算:QK^T、softmax、加权求和合并"""
scores = torch.matmul(Q, K.transpose(-2, -1)) * scale
weights = torch.softmax(scores, dim=-1)
output = torch.matmul(weights, V)
return output
# 使用torch.compile自动融合
model = torch.compile(model, mode="reduce-overhead")
流水线重叠
利用CUDA Stream实现计算与数据传输重叠:
def overlapped_execution():
stream1 = torch.cuda.Stream()
stream2 = torch.cuda.Stream()
# 主stream:前向传播
output1 = model_layer1(input)
# stream1:计算当前层
with torch.cuda.stream(stream1):
output2 = model_layer2(output1)
# stream2:预取下一层数据
with torch.cuda.stream(stream2):
next_input = output2.to('cpu', non_blocking=True)
# 同步所有stream
torch.cuda.synchronize()
Mixed Precision推理
from torch.cuda.amp import autocast
def optimized_inference(model, input_ids):
with torch.cuda.amp.autocast(dtype=torch.float16):
# 自动选择FP16/BF16计算
logits = model(input_ids)
# 关键操作保持FP32精度
with torch.cuda.amp.autocast(enabled=False):
probs = torch.softmax(logits.float(), dim=-1)
return probs
利用率监控
import subprocess
def get_gpu_utilization():
result = subprocess.run(
['nvidia-smi', '--query-gpu=utilization.gpu,utilization.memory,memory.used,memory.total',
'--format=csv,noheader,nounits'],
capture_output=True, text=True
)
for line in result.stdout.strip().split('\n'):
gpu_util, mem_util, mem_used, mem_total = line.split(', ')
print(f"GPU计算利用: {gpu_util}% | 显存利用: {mem_util}% | "
f"显存使用: {mem_used}/{mem_total}MB")
get_gpu_utilization()
最佳实践总结
- 监控SM利用率和显存带宽利用率,找到瓶颈类型
- 对显存-bound任务使用更大batch size提升带宽利用率
- 利用torch.compile和CUDA Graph减少kernel launch开销
- 使用CUDA Stream实现计算与通信重叠
- 定期清理显存碎片,避免OOM