← 返回首页
🧠

PagedAttention:高效KV Cache管理

📂 llm ⏱ 4 min 656 words

--- title: "PagedAttention:高效KV Cache管理" description: "深入理解PagedAttention的原理和vLLM的实现,实现高吞吐量LLM服务" tags: ["PagedAttention", "vLLM", "KV Cache", "内存管理"] category: "llm" icon: "🧠"

PagedAttention:高效KV Cache管理

PagedAttention简介

PagedAttention是由UC Berkeley提出的创新KV Cache管理技术,借鉴操作系统的虚拟内存和分页机制,实现了高效的KV Cache内存管理。它是vLLM推理引擎的核心技术,显著提升了LLM服务的吞吐量。

PagedAttention的核心优势:

工作原理

传统KV Cache的问题

# 传统KV Cache的内存问题:
# 1. 预分配:为最大序列长度预分配,浪费内存
# 2. 内存碎片:不同序列长度导致碎片
# 3. 复制开销:beam search需要复制KV Cache

class TraditionalKVCache:
    def __init__(self, max_seq_len, num_layers, num_heads, head_dim):
        # 预分配固定大小
        self.k_cache = torch.zeros(1, num_layers, num_heads, max_seq_len, head_dim)
        self.v_cache = torch.zeros(1, num_layers, num_heads, max_seq_len, head_dim)
        self.current_len = 0
    
    def append(self, k, v):
        """追加KV"""
        seq_len = k.shape[2]
        self.k_cache[:, :, :, self.current_len:self.current_len+seq_len, :] = k
        self.v_cache[:, :, :, self.current_len:self.current_len+seq_len, :] = v
        self.current_len += seq_len

PagedAttention的分页机制

class PagedKVCache:
    """PagedAttention实现"""
    
    def __init__(self, block_size=16, num_blocks=1000, 
                 num_layers=32, num_heads=32, head_dim=128):
        self.block_size = block_size
        self.num_blocks = num_blocks
        self.num_layers = num_layers
        self.num_heads = num_heads
        self.head_dim = head_dim
        
        # 预分配物理块
        self.k_blocks = torch.zeros(
            num_blocks, num_layers, num_heads, block_size, head_dim,
            dtype=torch.float16
        )
        self.v_blocks = torch.zeros(
            num_blocks, num_layers, num_heads, block_size, head_dim,
            dtype=torch.float16
        )
        
        # 空闲块列表
        self.free_blocks = list(range(num_blocks))
        
        # 逻辑块到物理块的映射
        self.block_tables = {}  # seq_id -> [block_ids]
        
        # 每个序列的当前长度
        self.seq_lengths = {}
    
    def allocate_block(self, seq_id):
        """分配新块"""
        if not self.free_blocks:
            raise MemoryError("No free blocks")
        
        block_id = self.free_blocks.pop()
        
        if seq_id not in self.block_tables:
            self.block_tables[seq_id] = []
        self.block_tables[seq_id].append(block_id)
        
        return block_id
    
    def free_sequence(self, seq_id):
        """释放序列的所有块"""
        if seq_id in self.block_tables:
            for block_id in self.block_tables[seq_id]:
                self.free_blocks.append(block_id)
            del self.block_tables[seq_id]
            del self.seq_lengths[seq_id]
    
    def append_kv(self, seq_id, layer_idx, k, v):
        """追加KV到缓存"""
        if seq_id not in self.seq_lengths:
            self.seq_lengths[seq_id] = 0
        
        seq_len = self.seq_lengths[seq_id]
        block_idx = seq_len // self.block_size
        offset = seq_len % self.block_size
        
        # 检查是否需要新块
        if block_idx >= len(self.block_tables.get(seq_id, [])):
            self.allocate_block(seq_id)
        
        # 写入块
        physical_block = self.block_tables[seq_id][block_idx]
        self.k_blocks[physical_block, layer_idx, :, offset, :] = k
        self.v_blocks[physical_block, layer_idx, :, offset, :] = v
        
        self.seq_lengths[seq_id] += 1

使用vLLM

基本使用

from vllm import LLM, SamplingParams

# 初始化vLLM引擎
llm = LLM(
    model="meta-llama/Llama-2-7b-hf",
    tensor_parallel_size=1,
    max_model_len=4096,
    gpu_memory_utilization=0.9,
    block_size=16,
    swap_space=4  # CPU交换空间GB
)

# 单条推理
prompt = "什么是机器学习?"
sampling_params = SamplingParams(temperature=0.7, max_tokens=256)
output = llm.generate(prompt, sampling_params)
print(output[0].outputs[0].text)

# 批量推理
prompts = ["问题1", "问题2", "问题3"]
outputs = llm.generate(prompts, sampling_params)
for output in outputs:
    print(output.outputs[0].text)

高级配置

from vllm import LLM, SamplingParams

# 高性能配置
llm = LLM(
    model="meta-llama/Llama-2-70b-hf",
    tensor_parallel_size=4,  # 4GPU张量并行
    max_model_len=8192,
    gpu_memory_utilization=0.95,
    block_size=16,
    max_num_batched_tokens=16384,
    max_num_seqs=512,
    swap_space=8,
    enforce_eager=False  # 启用CUDA Graph
)

# 采样参数
sampling_params = SamplingParams(
    temperature=0.7,
    top_p=0.9,
    top_k=50,
    max_tokens=512,
    presence_penalty=0.1,
    frequency_penalty=0.1
)

流式输出

from vllm import AsyncLLMEngine, AsyncEngineArgs

# 异步引擎配置
engine_args = AsyncEngineArgs(
    model="meta-llama/Llama-2-7b-hf",
    max_model_len=4096,
    gpu_memory_utilization=0.9
)

engine = AsyncLLMEngine.from_engine_args(engine_args)

# 流式生成
async def generate_stream(prompt):
    sampling_params = SamplingParams(temperature=0.7, max_tokens=256)
    
    async for output in engine.generate(prompt, sampling_params, request_id=1):
        yield output.outputs[0].text

# 使用
import asyncio
for text in asyncio.run(generate_stream("Hello!")):
    print(text, end="", flush=True)

内存管理

内存统计

def get_memory_stats(llm_engine):
    """获取vLLM内存统计"""
    stats = {
        "allocated_blocks": llm_engine.scheduler.block_manager.num_allocated_blocks,
        "free_blocks": llm_engine.scheduler.block_manager.num_free_blocks,
        "total_blocks": llm_engine.scheduler.block_manager.num_total_blocks,
        "gpu_memory_used": torch.cuda.memory_allocated() / 1024**3,
        "gpu_memory_reserved": torch.cuda.memory_reserved() / 1024**3
    }
    stats["utilization"] = stats["allocated_blocks"] / stats["total_blocks"]
    return stats

动态扩缩容

class AutoScaler:
    """自动扩缩容管理"""
    
    def __init__(self, min_instances=1, max_instances=10, target_utilization=0.8):
        self.min_instances = min_instances
        self.max_instances = max_instances
        self.target_utilization = target_utilization
        self.instances = []
    
    def check_scale(self, current_utilization, queue_size):
        """检查是否需要扩缩容"""
        if current_utilization > self.target_utilization and queue_size > 100:
            return self.scale_up()
        elif current_utilization < self.target_utilization * 0.5:
            return self.scale_down()
        return None
    
    def scale_up(self):
        """扩容"""
        if len(self.instances) < self.max_instances:
            new_instance = self.create_instance()
            self.instances.append(new_instance)
            return "scaled_up"
        return "max_reached"
    
    def scale_down(self):
        """缩容"""
        if len(self.instances) > self.min_instances:
            instance = self.instances.pop()
            self.terminate_instance(instance)
            return "scaled_down"
        return "min_reached"

高级特性

Beam Search支持

# vLLM原生支持beam search
sampling_params = SamplingParams(
    temperature=1.0,
    top_k=1,
    use_beam_search=True,
    best_of=5,  # beam size
    max_tokens=256
)

output = llm.generate(prompt, sampling_params)

序列间共享

# PagedAttention支持序列间KV Cache共享(如beam search)
# 无需复制,通过引用计数管理

def beam_search_with_sharing(llm, prompt, num_beams=4):
    """使用共享KV Cache的beam search"""
    sampling_params = SamplingParams(
        use_beam_search=True,
        best_of=num_beams,
        max_tokens=256
    )
    
    output = llm.generate(prompt, sampling_params)
    return output

多LoRA支持

from vllm import LLM, SamplingParams

# 支持多个LoRA适配器
llm = LLM(
    model="meta-llama/Llama-2-7b-hf",
    enable_lora=True,
    max_lora_rank=64
)

# 使用不同LoRA
sampling_params = SamplingParams(temperature=0.7, max_tokens=256)

# 指定LoRA
output = llm.generate(
    prompt,
    sampling_params,
    lora_request=LoRARequest("math_lora", 1, "./math_lora")
)

性能优化

批处理优化

# 最优批处理配置
optimal_config = {
    "max_num_batched_tokens": 8192,  # 最大批处理token数
    "max_num_seqs": 256,  # 最大并发序列数
    "block_size": 16,  # 块大小
    "gpu_memory_utilization": 0.9  # GPU内存利用率
}

# 根据硬件调整
def optimize_for_hardware(gpu_memory_gb, num_gpus):
    """根据硬件优化配置"""
    config = optimal_config.copy()
    
    if gpu_memory_gb >= 80:
        config["max_num_batched_tokens"] = 16384
        config["max_num_seqs"] = 512
    elif gpu_memory_gb >= 40:
        config["max_num_batched_tokens"] = 8192
        config["max_num_seqs"] = 256
    else:
        config["max_num_batched_tokens"] = 4096
        config["max_num_seqs"] = 128
    
    config["tensor_parallel_size"] = num_gpus
    
    return config

监控与调优

import time
from prometheus_client import Gauge, Histogram

# 监控指标
REQUEST_LATENCY = Histogram('vllm_request_latency_seconds', 'Request latency')
QUEUE_SIZE = Gauge('vllm_queue_size', 'Current queue size')
THROUGHPUT = Gauge('vllm_throughput', 'Requests per second')

def monitor_vllm(llm_engine):
    """监控vLLM性能"""
    start_time = time.time()
    
    while True:
        # 获取统计
        stats = get_memory_stats(llm_engine)
        
        # 更新指标
        QUEUE_SIZE.set(llm_engine.scheduler.waiting_queue_size)
        THROUGHPUT.set(stats.get("throughput", 0))
        
        time.sleep(1)

PagedAttention通过创新的内存管理机制,使vLLM成为目前最高效的LLM推理引擎之一。