LLM负载测试
--- title: "LLM负载测试" description: "详细介绍LLM系统的负载测试方法,包括并发测试、吞吐量测试、延迟测试、资源监控、负载测试工具使用以及性能基线建立" tags: ["负载测试", "性能测试", "并发测试", "LLM性能"] category: "llm" icon: "🧠"
LLM负载测试
负载测试的目的
负载测试评估LLM系统在预期负载下的性能表现。通过模拟真实或预期的并发请求量,验证系统的吞吐量、响应延迟和资源利用率是否满足业务需求。
关键性能指标
在负载测试中需要监控以下指标:
- 吞吐量(Throughput):每秒成功处理的请求数(RPS)
- 首token延迟(TTFT):从请求发出到收到第一个token的时间
- 端到端延迟:完整响应的总时间
- P50/P95/P99延迟:不同百分位数的延迟分布
- 错误率:请求失败的比例
- GPU利用率:GPU计算和显存的使用情况
使用Locust进行负载测试
from locust import HttpUser, task, between
class LLMUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
self.conversation_id = None
@task(5)
def simple_chat(self):
"""简单对话(权重5)"""
payload = {
"message": "你好,请介绍一下自己",
"max_tokens": 200,
}
with self.client.post(
"/chat",
json=payload,
catch_response=True,
) as response:
if response.status_code == 200:
data = response.json()
if "response" in data:
response.success()
else:
response.failure("Missing response field")
else:
response.failure(f"Status code: {response.status_code}")
@task(3)
def rag_query(self):
"""RAG查询(权重3)"""
payload = {
"message": "如何使用Python进行数据分析?",
"use_rag": True,
"top_k": 5,
}
self.client.post("/chat", json=payload)
@task(1)
def long_generation(self):
"""长文本生成(权重1)"""
payload = {
"message": "写一篇关于人工智能发展的文章",
"max_tokens": 2000,
}
self.client.post("/chat", json=payload)
自定义负载测试脚本
import asyncio
import aiohttp
import statistics
import time
class LLMLoadTester:
def __init__(self, base_url, concurrent_users=50, duration=300):
self.base_url = base_url
self.concurrent_users = concurrent_users
self.duration = duration
self.results = []
async def send_request(self, session, payload):
start_time = time.time()
async with session.post(
f"{self.base_url}/chat",
json=payload,
) as response:
data = await response.json()
ttft = data.get("first_token_time", 0)
total_time = time.time() - start_time
self.results.append({
"status": response.status,
"ttft": ttft,
"total_time": total_time,
})
async def worker(self, session, stop_time):
while time.time() < stop_time:
payload = {
"message": "测试问题",
"max_tokens": 100,
}
await self.send_request(session, payload)
await asyncio.sleep(0.1)
async def run(self):
stop_time = time.time() + self.duration
async with aiohttp.ClientSession() as session:
tasks = [
self.worker(session, stop_time)
for _ in range(self.concurrent_users)
]
await asyncio.gather(*tasks)
return self.analyze_results()
def analyze_results(self):
total_times = [r["total_time"] for r in self.results]
ttfts = [r["ttft"] for r in self.results if r["ttft"] > 0]
success_count = sum(1 for r in self.results if r["status"] == 200)
return {
"total_requests": len(self.results),
"success_rate": success_count / len(self.results) * 100,
"avg_latency": statistics.mean(total_times),
"p50_latency": statistics.median(total_times),
"p95_latency": sorted(total_times)[int(len(total_times) * 0.95)],
"p99_latency": sorted(total_times)[int(len(total_times) * 0.99)],
"avg_ttft": statistics.mean(ttfts) if ttfts else 0,
"throughput_rps": len(self.results) / self.duration,
}
async def main():
tester = LLMLoadTester(
base_url="http://localhost:8000",
concurrent_users=50,
duration=300,
)
results = await tester.run()
for key, value in results.items():
print(f"{key}: {value:.2f}")
asyncio.run(main())
资源监控
负载测试期间需要同步监控系统资源:
import psutil
import GPUtil
class ResourceMonitor:
def __init__(self):
self.gpus = GPUtil.getGPUs()
self.samples = []
def sample(self):
gpu = self.gpus[0] if self.gpus else None
self.samples.append({
"timestamp": time.time(),
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"gpu_util": gpu.loadUtilization * 100 if gpu else 0,
"gpu_memory": gpu.memoryUtil * 100 if gpu else 0,
})
def get_summary(self):
if not self.samples:
return {}
return {
"avg_cpu": sum(s["cpu_percent"] for s in self.samples) / len(self.samples),
"avg_gpu_util": sum(s["gpu_util"] for s in self.samples) / len(self.samples),
"max_gpu_memory": max(s["gpu_memory"] for s in self.samples),
}
性能基线与回归检测
建立性能基线,每次测试后与基线对比。设置延迟和吞吐量的阈值,超出阈值时自动告警。在CI/CD中集成负载测试,确保性能不会因代码变更而退化。