性能测试架构:基线、瓶颈与容量规划
性能测试架构:基线、瓶颈与容量规划
性能测试体系架构
性能测试是验证系统在负载下表现的关键手段。完整的性能测试体系包括:性能基线建立、负载测试、压力测试、容量规划和持续性能监控。目标是提前发现性能瓶颈,确保系统满足SLA要求。
# 性能测试核心框架
from dataclasses import dataclass, field
from typing import List, Dict, Callable, Optional
from enum import Enum
import time
import statistics
class TestType(Enum):
BASELINE = "baseline"
LOAD = "load"
STRESS = "stress"
SPIKE = "spike"
SOAK = "soak"
@dataclass
class PerformanceMetric:
name: str
value: float
unit: str
timestamp: float
percentiles: Dict[str, float] = field(default_factory=dict)
@dataclass
class LoadProfile:
target_users: int
ramp_up_duration: int # seconds
duration: int # seconds
ramp_down_duration: int = 0
@dataclass
class PerformanceResult:
test_type: TestType
start_time: float
end_time: float
metrics: List[PerformanceMetric]
success_rate: float
throughput: float # requests per second
error_count: int
class PerformanceTestRunner:
def __init__(self):
self.results = []
self.baselines = {}
def run_load_test(self, test_fn: Callable, profile: LoadProfile) -> PerformanceResult:
"""运行负载测试"""
start_time = time.time()
metrics = []
success_count = 0
error_count = 0
latencies = []
# 模拟负载
for i in range(profile.target_users):
try:
request_start = time.time()
test_fn()
latency = (time.time() - request_start) * 1000
latencies.append(latency)
success_count += 1
except Exception as e:
error_count += 1
end_time = time.time()
duration = end_time - start_time
# 计算指标
if latencies:
metrics.append(PerformanceMetric(
name="response_time",
value=statistics.mean(latencies),
unit="ms",
timestamp=end_time,
percentiles={
"p50": statistics.median(latencies),
"p95": sorted(latencies)[int(len(latencies) * 0.95)],
"p99": sorted(latencies)[int(len(latencies) * 0.99)]
}
))
total_requests = success_count + error_count
throughput = total_requests / duration if duration > 0 else 0
result = PerformanceResult(
test_type=TestType.LOAD,
start_time=start_time,
end_time=end_time,
metrics=metrics,
success_rate=success_count / total_requests if total_requests > 0 else 0,
throughput=throughput,
error_count=error_count
)
self.results.append(result)
return result
def compare_with_baseline(self, result: PerformanceResult,
baseline_name: str) -> Dict:
"""与基线对比"""
baseline = self.baselines.get(baseline_name)
if not baseline:
return {"error": "Baseline not found"}
comparison = {}
for metric in result.metrics:
baseline_metric = next(
(m for m in baseline.metrics if m.name == metric.name),
None
)
if baseline_metric:
change = ((metric.value - baseline_metric.value) /
baseline_metric.value * 100)
comparison[metric.name] = {
"current": metric.value,
"baseline": baseline_metric.value,
"change_percent": change,
"regression": change > 10 # 10%退化阈值
}
return comparison
def set_baseline(self, name: str, result: PerformanceResult):
"""设置性能基线"""
self.baselines[name] = result
print(f"Baseline '{name}' set with {len(result.metrics)} metrics")
性能瓶颈分析
性能瓶颈分析帮助定位系统中的性能限制点。通过分层分析(网络、应用、数据库、基础设施),找出影响性能的关键因素。
# 性能瓶颈分析器
from typing import Dict, List
from collections import defaultdict
class BottleneckAnalyzer:
def __init__(self):
self.metrics_store = defaultdict(list)
def record_metric(self, category: str, name: str,
value: float, timestamp: float):
"""记录性能指标"""
self.metrics_store[f"{category}.{name}"].append({
"value": value,
"timestamp": timestamp
})
def analyze(self) -> Dict:
"""分析性能瓶颈"""
bottlenecks = []
# 检查响应时间
response_times = self.metrics_store.get("application.response_time", [])
if response_times:
avg_rt = statistics.mean([m["value"] for m in response_times])
if avg_rt > 500: # 超过500ms
bottlenecks.append({
"category": "application",
"metric": "response_time",
"value": avg_rt,
"threshold": 500,
"severity": "high" if avg_rt > 1000 else "medium"
})
# 检查数据库查询时间
db_times = self.metrics_store.get("database.query_time", [])
if db_times:
avg_db = statistics.mean([m["value"] for m in db_times])
if avg_db > 100: # 超过100ms
bottlenecks.append({
"category": "database",
"metric": "query_time",
"value": avg_db,
"threshold": 100,
"severity": "high"
})
# 检查CPU使用率
cpu_usage = self.metrics_store.get("system.cpu_usage", [])
if cpu_usage:
avg_cpu = statistics.mean([m["value"] for m in cpu_usage])
if avg_cpu > 80: # 超过80%
bottlenecks.append({
"category": "infrastructure",
"metric": "cpu_usage",
"value": avg_cpu,
"threshold": 80,
"severity": "high"
})
# 检查内存使用率
memory_usage = self.metrics_store.get("system.memory_usage", [])
if memory_usage:
avg_memory = statistics.mean([m["value"] for m in memory_usage])
if avg_memory > 85: # 超过85%
bottlenecks.append({
"category": "infrastructure",
"metric": "memory_usage",
"value": avg_memory,
"threshold": 85,
"severity": "high"
})
return {
"bottlenecks": bottlenecks,
"summary": self._generate_summary(bottlenecks)
}
def _generate_summary(self, bottlenecks: List[Dict]) -> Dict:
"""生成分析摘要"""
by_severity = defaultdict(int)
by_category = defaultdict(int)
for b in bottlenecks:
by_severity[b["severity"]] += 1
by_category[b["category"]] += 1
return {
"total_bottlenecks": len(bottlenecks),
"by_severity": dict(by_severity),
"by_category": dict(by_category),
"recommendations": self._generate_recommendations(bottlenecks)
}
def _generate_recommendations(self, bottlenecks: List[Dict]) -> List[str]:
"""生成优化建议"""
recommendations = []
for b in bottlenecks:
if b["category"] == "database" and b["metric"] == "query_time":
recommendations.append("考虑添加数据库索引或优化查询")
elif b["category"] == "infrastructure" and b["metric"] == "cpu_usage":
recommendations.append("考虑增加CPU资源或优化计算密集型操作")
elif b["category"] == "application" and b["metric"] == "response_time":
recommendations.append("考虑添加缓存或优化业务逻辑")
return recommendations
# 火焰图分析
class FlameGraphAnalyzer:
def __init__(self):
self.profiles = []
def capture_profile(self, duration: int = 30):
"""捕获性能数据"""
print(f"Capturing profile for {duration} seconds...")
return {"status": "captured", "duration": duration}
def analyze_hotspots(self, profile_data: Dict) -> List[Dict]:
"""分析热点"""
return [
{"function": "process_request", "time_percent": 35.2},
{"function": "database_query", "time_percent": 28.5},
{"function": "serialize_response", "time_percent": 15.3}
]
容量规划
容量规划基于性能测试结果,预测系统在不同负载下的资源需求。支持自动扩展策略和成本优化。
# 容量规划器
from typing import Dict, List
import math
class CapacityPlanner:
def __init__(self, performance_result: PerformanceResult):
self.result = performance_result
self.plans = []
def plan_for_target(self, target_rps: float,
target_latency_ms: float) -> Dict:
"""为目标准备容量规划"""
current_rps = self.result.throughput
current_latency = self._get_avg_latency()
# 计算所需扩展倍数
rps_multiplier = target_rps / current_rps if current_rps > 0 else 1
latency_multiplier = current_latency / target_latency_ms if target_latency_ms > 0 else 1
# 取较大值确保满足两个目标
required_multiplier = max(rps_multiplier, latency_multiplier)
# 基础容量计算
current_instances = 3 # 假设当前3个实例
required_instances = math.ceil(current_instances * required_multiplier)
# 资源估算
plan = {
"target": {
"rps": target_rps,
"latency_ms": target_latency_ms
},
"current": {
"rps": current_rps,
"latency_ms": current_latency,
"instances": current_instances
},
"required": {
"instances": required_instances,
"cpu_cores": required_instances * 2,
"memory_gb": required_instances * 4
},
"cost_estimate": self._estimate_cost(required_instances)
}
self.plans.append(plan)
return plan
def plan_for_growth(self, growth_rate: float,
months: int) -> List[Dict]:
"""规划增长容量"""
plans = []
current_capacity = self.result.throughput
for month in range(1, months + 1):
projected_capacity = current_capacity * (1 + growth_rate) ** month
plan = self.plan_for_target(projected_capacity, 200)
plan["month"] = month
plans.append(plan)
return plans
def _get_avg_latency(self) -> float:
"""获取平均延迟"""
latency_metrics = [
m for m in self.result.metrics if m.name == "response_time"
]
if latency_metrics:
return latency_metrics[0].value
return 100 # 默认值
def _estimate_cost(self, instances: int) -> Dict:
"""估算成本"""
# 简化的成本估算
cost_per_instance = 100 # 每实例每月成本
return {
"monthly": instances * cost_per_instance,
"annual": instances * cost_per_instance * 12
}
# 自动扩展策略
class AutoScalingPolicy:
def __init__(self):
self.policies = []
def add_metric_policy(self, metric_name: str,
scale_up_threshold: float,
scale_down_threshold: float,
cooldown_seconds: int = 300):
"""添加指标策略"""
self.policies.append({
"type": "metric",
"metric": metric_name,
"scale_up": scale_up_threshold,
"scale_down": scale_down_threshold,
"cooldown": cooldown_seconds
})
def add_schedule_policy(self, schedule: str,
min_instances: int,
max_instances: int):
"""添加计划策略"""
self.policies.append({
"type": "schedule",
"schedule": schedule,
"min": min_instances,
"max": max_instances
})
def evaluate(self, current_metrics: Dict,
current_instances: int) -> Dict:
"""评估是否需要扩展"""
for policy in self.policies:
if policy["type"] == "metric":
metric_value = current_metrics.get(policy["metric"], 0)
if metric_value > policy["scale_up"]:
return {
"action": "scale_up",
"reason": f"{policy['metric']}={metric_value} > {policy['scale_up']}",
"target": current_instances + 1
}
elif metric_value < policy["scale_down"]:
return {
"action": "scale_down",
"reason": f"{policy['metric']}={metric_value} < {policy['scale_down']}",
"target": max(1, current_instances - 1)
}
return {"action": "none", "reason": "No scaling needed"}
# 成本优化分析
class CostOptimizer:
def __init__(self, infrastructure_costs: Dict):
self.costs = infrastructure_costs
def analyze_utilization(self, usage_data: Dict) -> Dict:
"""分析资源利用率"""
analysis = {}
for resource, usage in usage_data.items():
avg_usage = statistics.mean(usage) if usage else 0
analysis[resource] = {
"avg_utilization": avg_usage,
"recommendation": self._get_recommendation(avg_usage)
}
return analysis
def _get_recommendation(self, utilization: float) -> str:
if utilization < 30:
return "考虑缩减资源"
elif utilization < 70:
return "利用率合理"
else:
return "考虑扩容"
def calculate_savings(self, current: Dict, optimized: Dict) -> Dict:
"""计算节省成本"""
total_current = sum(current.values())
total_optimized = sum(optimized.values())
return {
"current_monthly": total_current,
"optimized_monthly": total_optimized,
"savings_monthly": total_current - total_optimized,
"savings_percent": ((total_current - total_optimized) / total_current * 100)
if total_current > 0 else 0
}
持续性能监控
持续性能监控将性能测试集成到日常开发流程中,通过性能回归检测和告警,及早发现性能问题。
# 持续性能监控
from datetime import datetime, timedelta
from collections import deque
class PerformanceMonitor:
def __init__(self, alert_thresholds: Dict = None):
self.metrics_history = defaultdict(lambda: deque(maxlen=1000))
self.alerts = []
self.thresholds = alert_thresholds or {
"response_time": {"warning": 500, "critical": 1000},
"error_rate": {"warning": 1, "critical": 5},
"throughput": {"warning_min": 100, "critical_min": 50}
}
def record_metric(self, name: str, value: float,
timestamp: float = None):
"""记录指标"""
timestamp = timestamp or time.time()
self.metrics_history[name].append({
"value": value,
"timestamp": timestamp
})
# 检查告警
self._check_alerts(name, value)
def _check_alerts(self, name: str, value: float):
"""检查告警"""
thresholds = self.thresholds.get(name, {})
if "critical" in thresholds and value > thresholds["critical"]:
self._create_alert(name, value, "critical")
elif "warning" in thresholds and value > thresholds["warning"]:
self._create_alert(name, value, "warning")
def _create_alert(self, metric: str, value: float, severity: str):
"""创建告警"""
alert = {
"metric": metric,
"value": value,
"severity": severity,
"timestamp": datetime.now().isoformat(),
"message": f"{metric} = {value} exceeds {severity} threshold"
}
self.alerts.append(alert)
print(f"Alert: {alert['message']}")
def detect_regression(self, metric_name: str,
window_size: int = 100) -> Dict:
"""检测性能回归"""
history = list(self.metrics_history.get(metric_name, []))
if len(history) < window_size * 2:
return {"regression_detected": False, "reason": "insufficient_data"}
# 分割为参考窗口和当前窗口
reference = [h["value"] for h in history[:window_size]]
current = [h["value"] for h in history[-window_size:]]
ref_mean = statistics.mean(reference)
curr_mean = statistics.mean(current)
# 计算变化百分比
change_percent = ((curr_mean - ref_mean) / ref_mean * 100) if ref_mean > 0 else 0
regression_detected = change_percent > 20 # 20%退化阈值
return {
"regression_detected": regression_detected,
"reference_mean": ref_mean,
"current_mean": curr_mean,
"change_percent": change_percent,
"recommendation": "investigate" if regression_detected else "normal"
}
def get_dashboard(self) -> Dict:
"""获取监控面板"""
dashboard = {
"timestamp": datetime.now().isoformat(),
"metrics": {},
"alerts": self.alerts[-10:], # 最近10条告警
"health_score": 100
}
for name, history in self.metrics_history.items():
if history:
values = [h["value"] for h in history]
dashboard["metrics"][name] = {
"current": values[-1],
"mean": statistics.mean(values),
"min": min(values),
"max": max(values),
"trend": self._calculate_trend(values)
}
# 计算健康分数
alert_count = len([a for a in self.alerts
if a["severity"] == "critical"])
dashboard["health_score"] -= alert_count * 10
return dashboard
def _calculate_trend(self, values: List[float]) -> str:
"""计算趋势"""
if len(values) < 2:
return "stable"
recent = values[-10:] if len(values) >= 10 else values
if recent[-1] > recent[0] * 1.1:
return "increasing"
elif recent[-1] < recent[0] * 0.9:
return "decreasing"
return "stable"
# 性能回归测试
class PerformanceRegressionTest:
def __init__(self, monitor: PerformanceMonitor):
self.monitor = monitor
def run_regression_suite(self, test_cases: List[Dict]) -> Dict:
"""运行性能回归测试套件"""
results = []
for test_case in test_cases:
result = self._run_test(test_case)
results.append(result)
return {
"total": len(results),
"passed": sum(1 for r in results if r["status"] == "passed"),
"regressions": [r for r in results if r["status"] == "regression"],
"results": results
}
def _run_test(self, test_case: Dict) -> Dict:
"""运行单个测试"""
metric_name = test_case["metric"]
threshold = test_case.get("threshold", 20)
regression = self.monitor.detect_regression(metric_name)
return {
"name": test_case["name"],
"metric": metric_name,
"status": "regression" if regression["regression_detected"] else "passed",
"details": regression
}