PagedAttention:高效KV Cache管理
--- title: "PagedAttention:高效KV Cache管理" description: "深入理解PagedAttention的原理和vLLM的实现,实现高吞吐量LLM服务" tags: ["PagedAttention", "vLLM", "KV Cache", "内存管理"] category: "llm" icon: "🧠"
PagedAttention:高效KV Cache管理
PagedAttention简介
PagedAttention是由UC Berkeley提出的创新KV Cache管理技术,借鉴操作系统的虚拟内存和分页机制,实现了高效的KV Cache内存管理。它是vLLM推理引擎的核心技术,显著提升了LLM服务的吞吐量。
PagedAttention的核心优势:
- 内存高效:消除内存碎片,利用率接近100%
- 高吞吐量:支持更多并发请求
- 灵活调度:支持beam search、采样等多种生成策略
- 零拷贝:支持序列间的KV Cache共享
工作原理
传统KV Cache的问题
# 传统KV Cache的内存问题:
# 1. 预分配:为最大序列长度预分配,浪费内存
# 2. 内存碎片:不同序列长度导致碎片
# 3. 复制开销:beam search需要复制KV Cache
class TraditionalKVCache:
def __init__(self, max_seq_len, num_layers, num_heads, head_dim):
# 预分配固定大小
self.k_cache = torch.zeros(1, num_layers, num_heads, max_seq_len, head_dim)
self.v_cache = torch.zeros(1, num_layers, num_heads, max_seq_len, head_dim)
self.current_len = 0
def append(self, k, v):
"""追加KV"""
seq_len = k.shape[2]
self.k_cache[:, :, :, self.current_len:self.current_len+seq_len, :] = k
self.v_cache[:, :, :, self.current_len:self.current_len+seq_len, :] = v
self.current_len += seq_len
PagedAttention的分页机制
class PagedKVCache:
"""PagedAttention实现"""
def __init__(self, block_size=16, num_blocks=1000,
num_layers=32, num_heads=32, head_dim=128):
self.block_size = block_size
self.num_blocks = num_blocks
self.num_layers = num_layers
self.num_heads = num_heads
self.head_dim = head_dim
# 预分配物理块
self.k_blocks = torch.zeros(
num_blocks, num_layers, num_heads, block_size, head_dim,
dtype=torch.float16
)
self.v_blocks = torch.zeros(
num_blocks, num_layers, num_heads, block_size, head_dim,
dtype=torch.float16
)
# 空闲块列表
self.free_blocks = list(range(num_blocks))
# 逻辑块到物理块的映射
self.block_tables = {} # seq_id -> [block_ids]
# 每个序列的当前长度
self.seq_lengths = {}
def allocate_block(self, seq_id):
"""分配新块"""
if not self.free_blocks:
raise MemoryError("No free blocks")
block_id = self.free_blocks.pop()
if seq_id not in self.block_tables:
self.block_tables[seq_id] = []
self.block_tables[seq_id].append(block_id)
return block_id
def free_sequence(self, seq_id):
"""释放序列的所有块"""
if seq_id in self.block_tables:
for block_id in self.block_tables[seq_id]:
self.free_blocks.append(block_id)
del self.block_tables[seq_id]
del self.seq_lengths[seq_id]
def append_kv(self, seq_id, layer_idx, k, v):
"""追加KV到缓存"""
if seq_id not in self.seq_lengths:
self.seq_lengths[seq_id] = 0
seq_len = self.seq_lengths[seq_id]
block_idx = seq_len // self.block_size
offset = seq_len % self.block_size
# 检查是否需要新块
if block_idx >= len(self.block_tables.get(seq_id, [])):
self.allocate_block(seq_id)
# 写入块
physical_block = self.block_tables[seq_id][block_idx]
self.k_blocks[physical_block, layer_idx, :, offset, :] = k
self.v_blocks[physical_block, layer_idx, :, offset, :] = v
self.seq_lengths[seq_id] += 1
使用vLLM
基本使用
from vllm import LLM, SamplingParams
# 初始化vLLM引擎
llm = LLM(
model="meta-llama/Llama-2-7b-hf",
tensor_parallel_size=1,
max_model_len=4096,
gpu_memory_utilization=0.9,
block_size=16,
swap_space=4 # CPU交换空间GB
)
# 单条推理
prompt = "什么是机器学习?"
sampling_params = SamplingParams(temperature=0.7, max_tokens=256)
output = llm.generate(prompt, sampling_params)
print(output[0].outputs[0].text)
# 批量推理
prompts = ["问题1", "问题2", "问题3"]
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
print(output.outputs[0].text)
高级配置
from vllm import LLM, SamplingParams
# 高性能配置
llm = LLM(
model="meta-llama/Llama-2-70b-hf",
tensor_parallel_size=4, # 4GPU张量并行
max_model_len=8192,
gpu_memory_utilization=0.95,
block_size=16,
max_num_batched_tokens=16384,
max_num_seqs=512,
swap_space=8,
enforce_eager=False # 启用CUDA Graph
)
# 采样参数
sampling_params = SamplingParams(
temperature=0.7,
top_p=0.9,
top_k=50,
max_tokens=512,
presence_penalty=0.1,
frequency_penalty=0.1
)
流式输出
from vllm import AsyncLLMEngine, AsyncEngineArgs
# 异步引擎配置
engine_args = AsyncEngineArgs(
model="meta-llama/Llama-2-7b-hf",
max_model_len=4096,
gpu_memory_utilization=0.9
)
engine = AsyncLLMEngine.from_engine_args(engine_args)
# 流式生成
async def generate_stream(prompt):
sampling_params = SamplingParams(temperature=0.7, max_tokens=256)
async for output in engine.generate(prompt, sampling_params, request_id=1):
yield output.outputs[0].text
# 使用
import asyncio
for text in asyncio.run(generate_stream("Hello!")):
print(text, end="", flush=True)
内存管理
内存统计
def get_memory_stats(llm_engine):
"""获取vLLM内存统计"""
stats = {
"allocated_blocks": llm_engine.scheduler.block_manager.num_allocated_blocks,
"free_blocks": llm_engine.scheduler.block_manager.num_free_blocks,
"total_blocks": llm_engine.scheduler.block_manager.num_total_blocks,
"gpu_memory_used": torch.cuda.memory_allocated() / 1024**3,
"gpu_memory_reserved": torch.cuda.memory_reserved() / 1024**3
}
stats["utilization"] = stats["allocated_blocks"] / stats["total_blocks"]
return stats
动态扩缩容
class AutoScaler:
"""自动扩缩容管理"""
def __init__(self, min_instances=1, max_instances=10, target_utilization=0.8):
self.min_instances = min_instances
self.max_instances = max_instances
self.target_utilization = target_utilization
self.instances = []
def check_scale(self, current_utilization, queue_size):
"""检查是否需要扩缩容"""
if current_utilization > self.target_utilization and queue_size > 100:
return self.scale_up()
elif current_utilization < self.target_utilization * 0.5:
return self.scale_down()
return None
def scale_up(self):
"""扩容"""
if len(self.instances) < self.max_instances:
new_instance = self.create_instance()
self.instances.append(new_instance)
return "scaled_up"
return "max_reached"
def scale_down(self):
"""缩容"""
if len(self.instances) > self.min_instances:
instance = self.instances.pop()
self.terminate_instance(instance)
return "scaled_down"
return "min_reached"
高级特性
Beam Search支持
# vLLM原生支持beam search
sampling_params = SamplingParams(
temperature=1.0,
top_k=1,
use_beam_search=True,
best_of=5, # beam size
max_tokens=256
)
output = llm.generate(prompt, sampling_params)
序列间共享
# PagedAttention支持序列间KV Cache共享(如beam search)
# 无需复制,通过引用计数管理
def beam_search_with_sharing(llm, prompt, num_beams=4):
"""使用共享KV Cache的beam search"""
sampling_params = SamplingParams(
use_beam_search=True,
best_of=num_beams,
max_tokens=256
)
output = llm.generate(prompt, sampling_params)
return output
多LoRA支持
from vllm import LLM, SamplingParams
# 支持多个LoRA适配器
llm = LLM(
model="meta-llama/Llama-2-7b-hf",
enable_lora=True,
max_lora_rank=64
)
# 使用不同LoRA
sampling_params = SamplingParams(temperature=0.7, max_tokens=256)
# 指定LoRA
output = llm.generate(
prompt,
sampling_params,
lora_request=LoRARequest("math_lora", 1, "./math_lora")
)
性能优化
批处理优化
# 最优批处理配置
optimal_config = {
"max_num_batched_tokens": 8192, # 最大批处理token数
"max_num_seqs": 256, # 最大并发序列数
"block_size": 16, # 块大小
"gpu_memory_utilization": 0.9 # GPU内存利用率
}
# 根据硬件调整
def optimize_for_hardware(gpu_memory_gb, num_gpus):
"""根据硬件优化配置"""
config = optimal_config.copy()
if gpu_memory_gb >= 80:
config["max_num_batched_tokens"] = 16384
config["max_num_seqs"] = 512
elif gpu_memory_gb >= 40:
config["max_num_batched_tokens"] = 8192
config["max_num_seqs"] = 256
else:
config["max_num_batched_tokens"] = 4096
config["max_num_seqs"] = 128
config["tensor_parallel_size"] = num_gpus
return config
监控与调优
import time
from prometheus_client import Gauge, Histogram
# 监控指标
REQUEST_LATENCY = Histogram('vllm_request_latency_seconds', 'Request latency')
QUEUE_SIZE = Gauge('vllm_queue_size', 'Current queue size')
THROUGHPUT = Gauge('vllm_throughput', 'Requests per second')
def monitor_vllm(llm_engine):
"""监控vLLM性能"""
start_time = time.time()
while True:
# 获取统计
stats = get_memory_stats(llm_engine)
# 更新指标
QUEUE_SIZE.set(llm_engine.scheduler.waiting_queue_size)
THROUGHPUT.set(stats.get("throughput", 0))
time.sleep(1)
PagedAttention通过创新的内存管理机制,使vLLM成为目前最高效的LLM推理引擎之一。