← 返回首页
🧠

LLM压力测试

📂 llm ⏱ 2 min 355 words

--- title: "LLM压力测试" description: "系统讲解LLM系统的压力测试方法,包括极限负载测试、资源耗尽测试、故障恢复测试、压力测试策略以及系统瓶颈识别" tags: ["压力测试", "极限测试", "系统瓶颈", "LLM稳定性"] category: "llm" icon: "🧠"

LLM压力测试

压力测试与负载测试的区别

负载测试验证系统在预期负载下的表现,而压力测试则逐步增加负载直到系统出现故障,目的是找到系统的极限容量和薄弱环节。压力测试回答的关键问题是:系统在什么条件下会崩溃?崩溃后能否恢复?

压力测试策略

阶梯式增压

逐步增加并发数,观察系统行为变化:

import asyncio
import aiohttp
import time

class StressTestRunner:
    def __init__(self, base_url):
        self.base_url = base_url
        self.results = []
    
    async def send_request(self, session, concurrency):
        start = time.time()
        try:
            async with session.post(
                f"{self.base_url}/chat",
                json={"message": "测试", "max_tokens": 50},
                timeout=aiohttp.ClientTimeout(total=30),
            ) as resp:
                await resp.json()
                return {
                    "status": resp.status,
                    "latency": time.time() - start,
                    "concurrency": concurrency,
                }
        except Exception as e:
            return {
                "status": "error",
                "latency": time.time() - start,
                "error": str(e),
                "concurrency": concurrency,
            }
    
    async def run_staircase(self, levels):
        """阶梯式压力测试"""
        for level in levels:
            print(f"\n=== 并发数: {level} ===")
            async with aiohttp.ClientSession() as session:
                tasks = [
                    self.send_request(session, level)
                    for _ in range(level)
                ]
                results = await asyncio.gather(*tasks)
                self._analyze_level(results, level)
                await asyncio.sleep(10)  # 冷却期
    
    def _analyze_level(self, results, level):
        success = sum(1 for r in results if r.get("status") == 200)
        errors = sum(1 for r in results if r.get("status") != 200)
        latencies = [r["latency"] for r in results if r.get("latency")]
        
        print(f"成功率: {success}/{level} ({success/level*100:.1f}%)")
        if latencies:
            print(f"平均延迟: {sum(latencies)/len(latencies):.2f}s")
            print(f"P95延迟: {sorted(latencies)[int(len(latencies)*0.95)]:.2f}s")

runner = StressTestRunner("http://localhost:8000")
asyncio.run(runner.run_staircase([10, 50, 100, 200, 500, 1000]))

脉冲式压力

短时间内突然增加大量请求,测试系统应对突发流量的能力。

async def pulse_stress_test(base_url, peak_concurrency=500, duration=30):
    """脉冲式压力测试"""
    async with aiohttp.ClientSession() as session:
        # 快速提升到峰值
        tasks = []
        for _ in range(peak_concurrency):
            task = asyncio.create_task(
                send_request(session, base_url)
            )
            tasks.append(task)
        
        # 持续一段时间
        await asyncio.sleep(duration)
        
        # 取消所有任务
        for task in tasks:
            task.cancel()
        
        await asyncio.gather(*tasks, return_exceptions=True)

资源耗尽测试

测试系统在资源接近耗尽时的行为:

class ResourceExhaustionTest:
    def __init__(self):
        self.monitor = ResourceMonitor()
    
    def test_gpu_memory_exhaustion(self, model_loader):
        """测试显存耗尽"""
        loaded_models = []
        try:
            while True:
                model = model_loader.load_new_instance()
                loaded_models.append(model)
                mem_usage = self.monitor.get_gpu_memory()
                print(f"GPU显存使用: {mem_usage:.1f}%")
                if mem_usage > 95:
                    print("接近显存极限")
                    break
        except Exception as e:
            print(f"显存耗尽错误: {e}")
        
        return len(loaded_models)
    
    def test_connection_pool_exhaustion(self, client):
        """测试连接池耗尽"""
        connections = []
        try:
            while True:
                conn = client.acquire_connection()
                connections.append(conn)
        except Exception as e:
            print(f"连接池耗尽: {e}")
        
        # 释放连接
        for conn in connections:
            client.release_connection(conn)

故障恢复测试

在压力测试过程中注入故障,测试系统的恢复能力:

class FaultInjectionTester:
    def __init__(self, base_url):
        self.base_url = base_url
    
    async def test_kill_worker(self):
        """杀死工作进程"""
        # 启动负载
        load_task = asyncio.create_task(self.sustained_load())
        
        # 注入故障
        await asyncio.sleep(5)
        os.system("kill -9 $(pgrep -f 'llm_worker')")
        
        # 观察恢复
        await asyncio.sleep(30)
        recovery_result = await self.check_health()
        
        load_task.cancel()
        return recovery_result
    
    async def test_network_partition(self):
        """模拟网络分区"""
        load_task = asyncio.create_task(self.sustained_load())
        
        await asyncio.sleep(5)
        os.system("iptables -A INPUT -s 10.0.0.0/8 -j DROP")
        
        await asyncio.sleep(10)
        os.system("iptables -D INPUT -s 10.0.0.0/8 -j DROP")
        
        await asyncio.sleep(30)
        recovery_result = await self.check_health()
        
        load_task.cancel()
        return recovery_result

瓶颈识别

压力测试的最终目的是找到系统瓶颈。常见瓶颈包括:

通过压力测试定位瓶颈后,针对性地优化,才能真正提升系统容量。