LLM压力测试
--- title: "LLM压力测试" description: "系统讲解LLM系统的压力测试方法,包括极限负载测试、资源耗尽测试、故障恢复测试、压力测试策略以及系统瓶颈识别" tags: ["压力测试", "极限测试", "系统瓶颈", "LLM稳定性"] category: "llm" icon: "🧠"
LLM压力测试
压力测试与负载测试的区别
负载测试验证系统在预期负载下的表现,而压力测试则逐步增加负载直到系统出现故障,目的是找到系统的极限容量和薄弱环节。压力测试回答的关键问题是:系统在什么条件下会崩溃?崩溃后能否恢复?
压力测试策略
阶梯式增压
逐步增加并发数,观察系统行为变化:
import asyncio
import aiohttp
import time
class StressTestRunner:
def __init__(self, base_url):
self.base_url = base_url
self.results = []
async def send_request(self, session, concurrency):
start = time.time()
try:
async with session.post(
f"{self.base_url}/chat",
json={"message": "测试", "max_tokens": 50},
timeout=aiohttp.ClientTimeout(total=30),
) as resp:
await resp.json()
return {
"status": resp.status,
"latency": time.time() - start,
"concurrency": concurrency,
}
except Exception as e:
return {
"status": "error",
"latency": time.time() - start,
"error": str(e),
"concurrency": concurrency,
}
async def run_staircase(self, levels):
"""阶梯式压力测试"""
for level in levels:
print(f"\n=== 并发数: {level} ===")
async with aiohttp.ClientSession() as session:
tasks = [
self.send_request(session, level)
for _ in range(level)
]
results = await asyncio.gather(*tasks)
self._analyze_level(results, level)
await asyncio.sleep(10) # 冷却期
def _analyze_level(self, results, level):
success = sum(1 for r in results if r.get("status") == 200)
errors = sum(1 for r in results if r.get("status") != 200)
latencies = [r["latency"] for r in results if r.get("latency")]
print(f"成功率: {success}/{level} ({success/level*100:.1f}%)")
if latencies:
print(f"平均延迟: {sum(latencies)/len(latencies):.2f}s")
print(f"P95延迟: {sorted(latencies)[int(len(latencies)*0.95)]:.2f}s")
runner = StressTestRunner("http://localhost:8000")
asyncio.run(runner.run_staircase([10, 50, 100, 200, 500, 1000]))
脉冲式压力
短时间内突然增加大量请求,测试系统应对突发流量的能力。
async def pulse_stress_test(base_url, peak_concurrency=500, duration=30):
"""脉冲式压力测试"""
async with aiohttp.ClientSession() as session:
# 快速提升到峰值
tasks = []
for _ in range(peak_concurrency):
task = asyncio.create_task(
send_request(session, base_url)
)
tasks.append(task)
# 持续一段时间
await asyncio.sleep(duration)
# 取消所有任务
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
资源耗尽测试
测试系统在资源接近耗尽时的行为:
class ResourceExhaustionTest:
def __init__(self):
self.monitor = ResourceMonitor()
def test_gpu_memory_exhaustion(self, model_loader):
"""测试显存耗尽"""
loaded_models = []
try:
while True:
model = model_loader.load_new_instance()
loaded_models.append(model)
mem_usage = self.monitor.get_gpu_memory()
print(f"GPU显存使用: {mem_usage:.1f}%")
if mem_usage > 95:
print("接近显存极限")
break
except Exception as e:
print(f"显存耗尽错误: {e}")
return len(loaded_models)
def test_connection_pool_exhaustion(self, client):
"""测试连接池耗尽"""
connections = []
try:
while True:
conn = client.acquire_connection()
connections.append(conn)
except Exception as e:
print(f"连接池耗尽: {e}")
# 释放连接
for conn in connections:
client.release_connection(conn)
故障恢复测试
在压力测试过程中注入故障,测试系统的恢复能力:
class FaultInjectionTester:
def __init__(self, base_url):
self.base_url = base_url
async def test_kill_worker(self):
"""杀死工作进程"""
# 启动负载
load_task = asyncio.create_task(self.sustained_load())
# 注入故障
await asyncio.sleep(5)
os.system("kill -9 $(pgrep -f 'llm_worker')")
# 观察恢复
await asyncio.sleep(30)
recovery_result = await self.check_health()
load_task.cancel()
return recovery_result
async def test_network_partition(self):
"""模拟网络分区"""
load_task = asyncio.create_task(self.sustained_load())
await asyncio.sleep(5)
os.system("iptables -A INPUT -s 10.0.0.0/8 -j DROP")
await asyncio.sleep(10)
os.system("iptables -D INPUT -s 10.0.0.0/8 -j DROP")
await asyncio.sleep(30)
recovery_result = await self.check_health()
load_task.cancel()
return recovery_result
瓶颈识别
压力测试的最终目的是找到系统瓶颈。常见瓶颈包括:
- GPU显存不足:模型无法加载或推理时OOM
- CPU瓶颈:数据预处理、tokenization等CPU密集操作
- 网络带宽:大模型权重传输或高并发请求
- 存储IO:模型加载速度或日志写入
通过压力测试定位瓶颈后,针对性地优化,才能真正提升系统容量。