← 返回首页
🧠

LLM负载测试

📂 llm ⏱ 2 min 384 words

--- title: "LLM负载测试" description: "详细介绍LLM系统的负载测试方法,包括并发测试、吞吐量测试、延迟测试、资源监控、负载测试工具使用以及性能基线建立" tags: ["负载测试", "性能测试", "并发测试", "LLM性能"] category: "llm" icon: "🧠"

LLM负载测试

负载测试的目的

负载测试评估LLM系统在预期负载下的性能表现。通过模拟真实或预期的并发请求量,验证系统的吞吐量、响应延迟和资源利用率是否满足业务需求。

关键性能指标

在负载测试中需要监控以下指标:

使用Locust进行负载测试

from locust import HttpUser, task, between

class LLMUser(HttpUser):
    wait_time = between(1, 3)
    
    def on_start(self):
        self.conversation_id = None
    
    @task(5)
    def simple_chat(self):
        """简单对话(权重5)"""
        payload = {
            "message": "你好,请介绍一下自己",
            "max_tokens": 200,
        }
        with self.client.post(
            "/chat",
            json=payload,
            catch_response=True,
        ) as response:
            if response.status_code == 200:
                data = response.json()
                if "response" in data:
                    response.success()
                else:
                    response.failure("Missing response field")
            else:
                response.failure(f"Status code: {response.status_code}")
    
    @task(3)
    def rag_query(self):
        """RAG查询(权重3)"""
        payload = {
            "message": "如何使用Python进行数据分析?",
            "use_rag": True,
            "top_k": 5,
        }
        self.client.post("/chat", json=payload)
    
    @task(1)
    def long_generation(self):
        """长文本生成(权重1)"""
        payload = {
            "message": "写一篇关于人工智能发展的文章",
            "max_tokens": 2000,
        }
        self.client.post("/chat", json=payload)

自定义负载测试脚本

import asyncio
import aiohttp
import statistics
import time

class LLMLoadTester:
    def __init__(self, base_url, concurrent_users=50, duration=300):
        self.base_url = base_url
        self.concurrent_users = concurrent_users
        self.duration = duration
        self.results = []
    
    async def send_request(self, session, payload):
        start_time = time.time()
        async with session.post(
            f"{self.base_url}/chat",
            json=payload,
        ) as response:
            data = await response.json()
            ttft = data.get("first_token_time", 0)
            total_time = time.time() - start_time
            
            self.results.append({
                "status": response.status,
                "ttft": ttft,
                "total_time": total_time,
            })
    
    async def worker(self, session, stop_time):
        while time.time() < stop_time:
            payload = {
                "message": "测试问题",
                "max_tokens": 100,
            }
            await self.send_request(session, payload)
            await asyncio.sleep(0.1)
    
    async def run(self):
        stop_time = time.time() + self.duration
        async with aiohttp.ClientSession() as session:
            tasks = [
                self.worker(session, stop_time)
                for _ in range(self.concurrent_users)
            ]
            await asyncio.gather(*tasks)
        
        return self.analyze_results()
    
    def analyze_results(self):
        total_times = [r["total_time"] for r in self.results]
        ttfts = [r["ttft"] for r in self.results if r["ttft"] > 0]
        success_count = sum(1 for r in self.results if r["status"] == 200)
        
        return {
            "total_requests": len(self.results),
            "success_rate": success_count / len(self.results) * 100,
            "avg_latency": statistics.mean(total_times),
            "p50_latency": statistics.median(total_times),
            "p95_latency": sorted(total_times)[int(len(total_times) * 0.95)],
            "p99_latency": sorted(total_times)[int(len(total_times) * 0.99)],
            "avg_ttft": statistics.mean(ttfts) if ttfts else 0,
            "throughput_rps": len(self.results) / self.duration,
        }

async def main():
    tester = LLMLoadTester(
        base_url="http://localhost:8000",
        concurrent_users=50,
        duration=300,
    )
    results = await tester.run()
    for key, value in results.items():
        print(f"{key}: {value:.2f}")

asyncio.run(main())

资源监控

负载测试期间需要同步监控系统资源:

import psutil
import GPUtil

class ResourceMonitor:
    def __init__(self):
        self.gpus = GPUtil.getGPUs()
        self.samples = []
    
    def sample(self):
        gpu = self.gpus[0] if self.gpus else None
        self.samples.append({
            "timestamp": time.time(),
            "cpu_percent": psutil.cpu_percent(),
            "memory_percent": psutil.virtual_memory().percent,
            "gpu_util": gpu.loadUtilization * 100 if gpu else 0,
            "gpu_memory": gpu.memoryUtil * 100 if gpu else 0,
        })
    
    def get_summary(self):
        if not self.samples:
            return {}
        return {
            "avg_cpu": sum(s["cpu_percent"] for s in self.samples) / len(self.samples),
            "avg_gpu_util": sum(s["gpu_util"] for s in self.samples) / len(self.samples),
            "max_gpu_memory": max(s["gpu_memory"] for s in self.samples),
        }

性能基线与回归检测

建立性能基线,每次测试后与基线对比。设置延迟和吞吐量的阈值,超出阈值时自动告警。在CI/CD中集成负载测试,确保性能不会因代码变更而退化。