← 返回首页
🧠

压力测试模型:验证极限性能

📂 llm ⏱ 4 min 786 words

--- title: "压力测试模型:验证极限性能" description: "对LLM进行压力测试,验证模型在极端条件下的性能表现" tags: ["压力测试", "性能测试", "极限测试", "LLM", "负载测试"] category: "llm" icon: "💪"

压力测试模型:验证极限性能

压力测试概述

压力测试是验证LLM在高负载、大输入、长时间运行等极端条件下性能表现的系统化方法。

测试维度

1. 输入压力测试

import time
import numpy as np
from typing import List, Dict, Callable
from dataclasses import dataclass

@dataclass
class StressTestResult:
    """压力测试结果"""
    test_name: str
    input_size: int
    processing_time: float
    memory_usage: float
    success: bool
    error_message: str = None

class InputStressTester:
    """输入压力测试"""
    
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def test_token_length(self, max_tokens: int = 8192, step: int = 512) -> List[StressTestResult]:
        """测试Token长度压力"""
        results = []
        
        for n_tokens in range(512, max_tokens + 1, step):
            # 生成测试输入
            test_input = "测试 " * (n_tokens // 2)
            
            start_time = time.time()
            try:
                inputs = self.tokenizer(test_input, return_tensors="pt", truncation=True, 
                                       max_length=n_tokens)
                
                with torch.no_grad():
                    outputs = self.model(**inputs)
                
                processing_time = time.time() - start_time
                
                results.append(StressTestResult(
                    test_name=f"token_length_{n_tokens}",
                    input_size=n_tokens,
                    processing_time=processing_time,
                    memory_usage=self._get_memory_usage(),
                    success=True
                ))
            except Exception as e:
                results.append(StressTestResult(
                    test_name=f"token_length_{n_tokens}",
                    input_size=n_tokens,
                    processing_time=time.time() - start_time,
                    memory_usage=self._get_memory_usage(),
                    success=False,
                    error_message=str(e)
                ))
                break  # 停止测试
        
        return results
    
    def test_batch_size(self, max_batch: int = 64, input_length: int = 512) -> List[StressTestResult]:
        """测试批处理大小压力"""
        results = []
        
        for batch_size in [1, 2, 4, 8, 16, 32, 64]:
            if batch_size > max_batch:
                break
            
            # 生成测试输入
            test_inputs = ["测试输入 " * (input_length // 4)] * batch_size
            
            start_time = time.time()
            try:
                inputs = self.tokenizer(test_inputs, return_tensors="pt", padding=True, 
                                       truncation=True)
                
                with torch.no_grad():
                    outputs = self.model(**inputs)
                
                processing_time = time.time() - start_time
                
                results.append(StressTestResult(
                    test_name=f"batch_size_{batch_size}",
                    input_size=batch_size * input_length,
                    processing_time=processing_time,
                    memory_usage=self._get_memory_usage(),
                    success=True
                ))
            except Exception as e:
                results.append(StressTestResult(
                    test_name=f"batch_size_{batch_size}",
                    input_size=batch_size * input_length,
                    processing_time=time.time() - start_time,
                    memory_usage=self._get_memory_usage(),
                    success=False,
                    error_message=str(e)
                ))
                break
        
        return results
    
    def _get_memory_usage(self) -> float:
        """获取内存使用量"""
        import psutil
        import os
        process = psutil.Process(os.getpid())
        return process.memory_info().rss / 1024 / 1024  # MB

2. 并发压力测试

import concurrent.futures
from queue import Queue
from threading import Lock

class ConcurrentStressTester:
    """并发压力测试"""
    
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
        self.results_lock = Lock()
        self.results = []
    
    def single_inference(self, text: str) -> Dict:
        """单次推理"""
        start_time = time.time()
        
        try:
            inputs = self.tokenizer(text, return_tensors="pt")
            with torch.no_grad():
                outputs = self.model.generate(**inputs, max_new_tokens=50)
            
            result = {
                "success": True,
                "latency": time.time() - start_time,
                "output_length": outputs.shape[1]
            }
        except Exception as e:
            result = {
                "success": False,
                "latency": time.time() - start_time,
                "error": str(e)
            }
        
        return result
    
    def test_concurrent_load(self, n_concurrent: int = 10, duration: int = 30) -> Dict:
        """测试并发负载"""
        results = {
            "total_requests": 0,
            "successful_requests": 0,
            "failed_requests": 0,
            "latencies": [],
            "throughput": 0
        }
        
        start_time = time.time()
        
        def worker():
            while time.time() - start_time < duration:
                text = "测试输入 " * 50
                result = self.single_inference(text)
                
                with self.results_lock:
                    results["total_requests"] += 1
                    if result["success"]:
                        results["successful_requests"] += 1
                        results["latencies"].append(result["latency"])
                    else:
                        results["failed_requests"] += 1
        
        # 启动工作线程
        with concurrent.futures.ThreadPoolExecutor(max_workers=n_concurrent) as executor:
            futures = [executor.submit(worker) for _ in range(n_concurrent)]
            concurrent.futures.wait(futures)
        
        # 计算统计信息
        elapsed_time = time.time() - start_time
        results["throughput"] = results["successful_requests"] / elapsed_time
        results["mean_latency"] = np.mean(results["latencies"]) if results["latencies"] else 0
        results["p95_latency"] = np.percentile(results["latencies"], 95) if results["latencies"] else 0
        results["p99_latency"] = np.percentile(results["latencies"], 99) if results["latencies"] else 0
        
        return results
    
    def test_sustained_load(self, requests_per_second: float = 10, duration: int = 60) -> Dict:
        """测试持续负载"""
        results = {
            "timeline": [],
            "latencies": [],
            "errors": []
        }
        
        interval = 1.0 / requests_per_second
        start_time = time.time()
        
        while time.time() - start_time < duration:
            request_start = time.time()
            
            text = "持续负载测试 " * 30
            result = self.single_inference(text)
            
            results["latencies"].append(result["latency"])
            results["timeline"].append({
                "timestamp": time.time() - start_time,
                "success": result["success"],
                "latency": result["latency"]
            })
            
            if not result["success"]:
                results["errors"].append(result.get("error", "Unknown"))
            
            # 控制请求速率
            elapsed = time.time() - request_start
            if elapsed < interval:
                time.sleep(interval - elapsed)
        
        results["throughput"] = len(results["latencies"]) / duration
        results["error_rate"] = len(results["errors"]) / len(results["latencies"])
        
        return results

3. 长时间运行测试

class EnduranceTester:
    """耐久性测试"""
    
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
    
    def test_memory_leak(self, n_iterations: int = 1000) -> Dict:
        """测试内存泄漏"""
        import psutil
        import os
        
        process = psutil.Process(os.getpid())
        memory_snapshots = []
        
        for i in range(n_iterations):
            # 记录内存使用
            if i % 100 == 0:
                memory = process.memory_info().rss / 1024 / 1024
                memory_snapshots.append({"iteration": i, "memory_mb": memory})
            
            # 执行推理
            text = f"内存泄漏测试 {i}"
            inputs = self.tokenizer(text, return_tensors="pt")
            with torch.no_grad():
                self.model.generate(**inputs, max_new_tokens=10)
        
        # 分析内存趋势
        memories = [s["memory_mb"] for s in memory_snapshots]
        memory_trend = np.polyfit(range(len(memories)), memories, 1)[0]
        
        return {
            "iterations": n_iterations,
            "initial_memory": memories[0],
            "final_memory": memories[-1],
            "memory_trend_mb_per_iter": memory_trend,
            "memory_leak_detected": memory_trend > 0.1  # 每次迭代增长超过0.1MB
        }
    
    def test_performance_degradation(self, n_iterations: int = 100) -> Dict:
        """测试性能退化"""
        latencies = []
        
        for i in range(n_iterations):
            text = f"性能退化测试 {i}" * 10
            
            start_time = time.time()
            inputs = self.tokenizer(text, return_tensors="pt")
            with torch.no_grad():
                self.model.generate(**inputs, max_new_tokens=50)
            latency = time.time() - start_time
            
            latencies.append(latency)
        
        # 分析性能趋势
        latency_trend = np.polyfit(range(len(latencies)), latencies, 1)[0]
        
        return {
            "iterations": n_iterations,
            "mean_latency": np.mean(latencies),
            "latency_trend": latency_trend,
            "performance_degradation": latency_trend > 0.01  # 每次迭代增加超过0.01秒
        }

结果分析

import matplotlib.pyplot as plt

def plot_stress_test_results(results: Dict):
    """绘制压力测试结果"""
    fig, axes = plt.subplots(2, 2, figsize=(12, 10))
    
    # 延迟分布
    if "latencies" in results:
        axes[0, 0].hist(results["latencies"], bins=30, edgecolor="black")
        axes[0, 0].set_title("Latency Distribution")
        axes[0, 0].set_xlabel("Latency (seconds)")
    
    # 延迟趋势
    if "timeline" in results:
        timestamps = [t["timestamp"] for t in results["timeline"]]
        latencies = [t["latency"] for t in results["timeline"]]
        axes[0, 1].plot(timestamps, latencies, "o-")
        axes[0, 1].set_title("Latency Over Time")
        axes[0, 1].set_xlabel("Time (seconds)")
        axes[0, 1].set_ylabel("Latency")
    
    # 内存使用
    if "memory_snapshots" in results:
        iterations = [s["iteration"] for s in results["memory_snapshots"]]
        memory = [s["memory_mb"] for s in results["memory_snapshots"]]
        axes[1, 0].plot(iterations, memory, "o-")
        axes[1, 0].set_title("Memory Usage Over Time")
        axes[1, 0].set_xlabel("Iteration")
        axes[1, 0].set_ylabel("Memory (MB)")
    
    # 成功率
    if "successful_requests" in results:
        success_rate = results["successful_requests"] / results["total_requests"] * 100
        axes[1, 1].pie([success_rate, 100-success_rate], 
                       labels=["Success", "Failure"], autopct="%1.1f%%")
        axes[1, 1].set_title(f"Success Rate: {success_rate:.1f}%")
    
    plt.tight_layout()
    plt.show()

最佳实践

  1. 渐进式加载:从低负载开始,逐步增加压力
  2. 监控指标:实时监控延迟、吞吐量、错误率
  3. 资源监控:监控CPU、内存、GPU使用情况
  4. 结果可视化:使用图表直观展示测试结果

总结

压力测试是验证LLM极限性能的重要环节。通过系统化的压力测试,可以发现性能瓶颈,优化系统配置。