← 返回首页
🧪

性能测试架构:基线、瓶颈与容量规划

📂 architecture ⏱ 7 min 1368 words

性能测试架构:基线、瓶颈与容量规划

性能测试体系架构

性能测试是验证系统在负载下表现的关键手段。完整的性能测试体系包括:性能基线建立、负载测试、压力测试、容量规划和持续性能监控。目标是提前发现性能瓶颈,确保系统满足SLA要求。

# 性能测试核心框架
from dataclasses import dataclass, field
from typing import List, Dict, Callable, Optional
from enum import Enum
import time
import statistics

class TestType(Enum):
    BASELINE = "baseline"
    LOAD = "load"
    STRESS = "stress"
    SPIKE = "spike"
    SOAK = "soak"

@dataclass
class PerformanceMetric:
    name: str
    value: float
    unit: str
    timestamp: float
    percentiles: Dict[str, float] = field(default_factory=dict)

@dataclass
class LoadProfile:
    target_users: int
    ramp_up_duration: int  # seconds
    duration: int  # seconds
    ramp_down_duration: int = 0

@dataclass
class PerformanceResult:
    test_type: TestType
    start_time: float
    end_time: float
    metrics: List[PerformanceMetric]
    success_rate: float
    throughput: float  # requests per second
    error_count: int

class PerformanceTestRunner:
    def __init__(self):
        self.results = []
        self.baselines = {}
    
    def run_load_test(self, test_fn: Callable, profile: LoadProfile) -> PerformanceResult:
        """运行负载测试"""
        start_time = time.time()
        metrics = []
        success_count = 0
        error_count = 0
        latencies = []
        
        # 模拟负载
        for i in range(profile.target_users):
            try:
                request_start = time.time()
                test_fn()
                latency = (time.time() - request_start) * 1000
                latencies.append(latency)
                success_count += 1
            except Exception as e:
                error_count += 1
        
        end_time = time.time()
        duration = end_time - start_time
        
        # 计算指标
        if latencies:
            metrics.append(PerformanceMetric(
                name="response_time",
                value=statistics.mean(latencies),
                unit="ms",
                timestamp=end_time,
                percentiles={
                    "p50": statistics.median(latencies),
                    "p95": sorted(latencies)[int(len(latencies) * 0.95)],
                    "p99": sorted(latencies)[int(len(latencies) * 0.99)]
                }
            ))
        
        total_requests = success_count + error_count
        throughput = total_requests / duration if duration > 0 else 0
        
        result = PerformanceResult(
            test_type=TestType.LOAD,
            start_time=start_time,
            end_time=end_time,
            metrics=metrics,
            success_rate=success_count / total_requests if total_requests > 0 else 0,
            throughput=throughput,
            error_count=error_count
        )
        
        self.results.append(result)
        return result
    
    def compare_with_baseline(self, result: PerformanceResult, 
                             baseline_name: str) -> Dict:
        """与基线对比"""
        baseline = self.baselines.get(baseline_name)
        if not baseline:
            return {"error": "Baseline not found"}
        
        comparison = {}
        
        for metric in result.metrics:
            baseline_metric = next(
                (m for m in baseline.metrics if m.name == metric.name),
                None
            )
            
            if baseline_metric:
                change = ((metric.value - baseline_metric.value) / 
                         baseline_metric.value * 100)
                comparison[metric.name] = {
                    "current": metric.value,
                    "baseline": baseline_metric.value,
                    "change_percent": change,
                    "regression": change > 10  # 10%退化阈值
                }
        
        return comparison
    
    def set_baseline(self, name: str, result: PerformanceResult):
        """设置性能基线"""
        self.baselines[name] = result
        print(f"Baseline '{name}' set with {len(result.metrics)} metrics")

性能瓶颈分析

性能瓶颈分析帮助定位系统中的性能限制点。通过分层分析(网络、应用、数据库、基础设施),找出影响性能的关键因素。

# 性能瓶颈分析器
from typing import Dict, List
from collections import defaultdict

class BottleneckAnalyzer:
    def __init__(self):
        self.metrics_store = defaultdict(list)
    
    def record_metric(self, category: str, name: str, 
                     value: float, timestamp: float):
        """记录性能指标"""
        self.metrics_store[f"{category}.{name}"].append({
            "value": value,
            "timestamp": timestamp
        })
    
    def analyze(self) -> Dict:
        """分析性能瓶颈"""
        bottlenecks = []
        
        # 检查响应时间
        response_times = self.metrics_store.get("application.response_time", [])
        if response_times:
            avg_rt = statistics.mean([m["value"] for m in response_times])
            if avg_rt > 500:  # 超过500ms
                bottlenecks.append({
                    "category": "application",
                    "metric": "response_time",
                    "value": avg_rt,
                    "threshold": 500,
                    "severity": "high" if avg_rt > 1000 else "medium"
                })
        
        # 检查数据库查询时间
        db_times = self.metrics_store.get("database.query_time", [])
        if db_times:
            avg_db = statistics.mean([m["value"] for m in db_times])
            if avg_db > 100:  # 超过100ms
                bottlenecks.append({
                    "category": "database",
                    "metric": "query_time",
                    "value": avg_db,
                    "threshold": 100,
                    "severity": "high"
                })
        
        # 检查CPU使用率
        cpu_usage = self.metrics_store.get("system.cpu_usage", [])
        if cpu_usage:
            avg_cpu = statistics.mean([m["value"] for m in cpu_usage])
            if avg_cpu > 80:  # 超过80%
                bottlenecks.append({
                    "category": "infrastructure",
                    "metric": "cpu_usage",
                    "value": avg_cpu,
                    "threshold": 80,
                    "severity": "high"
                })
        
        # 检查内存使用率
        memory_usage = self.metrics_store.get("system.memory_usage", [])
        if memory_usage:
            avg_memory = statistics.mean([m["value"] for m in memory_usage])
            if avg_memory > 85:  # 超过85%
                bottlenecks.append({
                    "category": "infrastructure",
                    "metric": "memory_usage",
                    "value": avg_memory,
                    "threshold": 85,
                    "severity": "high"
                })
        
        return {
            "bottlenecks": bottlenecks,
            "summary": self._generate_summary(bottlenecks)
        }
    
    def _generate_summary(self, bottlenecks: List[Dict]) -> Dict:
        """生成分析摘要"""
        by_severity = defaultdict(int)
        by_category = defaultdict(int)
        
        for b in bottlenecks:
            by_severity[b["severity"]] += 1
            by_category[b["category"]] += 1
        
        return {
            "total_bottlenecks": len(bottlenecks),
            "by_severity": dict(by_severity),
            "by_category": dict(by_category),
            "recommendations": self._generate_recommendations(bottlenecks)
        }
    
    def _generate_recommendations(self, bottlenecks: List[Dict]) -> List[str]:
        """生成优化建议"""
        recommendations = []
        
        for b in bottlenecks:
            if b["category"] == "database" and b["metric"] == "query_time":
                recommendations.append("考虑添加数据库索引或优化查询")
            elif b["category"] == "infrastructure" and b["metric"] == "cpu_usage":
                recommendations.append("考虑增加CPU资源或优化计算密集型操作")
            elif b["category"] == "application" and b["metric"] == "response_time":
                recommendations.append("考虑添加缓存或优化业务逻辑")
        
        return recommendations

# 火焰图分析
class FlameGraphAnalyzer:
    def __init__(self):
        self.profiles = []
    
    def capture_profile(self, duration: int = 30):
        """捕获性能数据"""
        print(f"Capturing profile for {duration} seconds...")
        return {"status": "captured", "duration": duration}
    
    def analyze_hotspots(self, profile_data: Dict) -> List[Dict]:
        """分析热点"""
        return [
            {"function": "process_request", "time_percent": 35.2},
            {"function": "database_query", "time_percent": 28.5},
            {"function": "serialize_response", "time_percent": 15.3}
        ]

容量规划

容量规划基于性能测试结果,预测系统在不同负载下的资源需求。支持自动扩展策略和成本优化。

# 容量规划器
from typing import Dict, List
import math

class CapacityPlanner:
    def __init__(self, performance_result: PerformanceResult):
        self.result = performance_result
        self.plans = []
    
    def plan_for_target(self, target_rps: float, 
                       target_latency_ms: float) -> Dict:
        """为目标准备容量规划"""
        current_rps = self.result.throughput
        current_latency = self._get_avg_latency()
        
        # 计算所需扩展倍数
        rps_multiplier = target_rps / current_rps if current_rps > 0 else 1
        latency_multiplier = current_latency / target_latency_ms if target_latency_ms > 0 else 1
        
        # 取较大值确保满足两个目标
        required_multiplier = max(rps_multiplier, latency_multiplier)
        
        # 基础容量计算
        current_instances = 3  # 假设当前3个实例
        required_instances = math.ceil(current_instances * required_multiplier)
        
        # 资源估算
        plan = {
            "target": {
                "rps": target_rps,
                "latency_ms": target_latency_ms
            },
            "current": {
                "rps": current_rps,
                "latency_ms": current_latency,
                "instances": current_instances
            },
            "required": {
                "instances": required_instances,
                "cpu_cores": required_instances * 2,
                "memory_gb": required_instances * 4
            },
            "cost_estimate": self._estimate_cost(required_instances)
        }
        
        self.plans.append(plan)
        return plan
    
    def plan_for_growth(self, growth_rate: float, 
                       months: int) -> List[Dict]:
        """规划增长容量"""
        plans = []
        current_capacity = self.result.throughput
        
        for month in range(1, months + 1):
            projected_capacity = current_capacity * (1 + growth_rate) ** month
            plan = self.plan_for_target(projected_capacity, 200)
            plan["month"] = month
            plans.append(plan)
        
        return plans
    
    def _get_avg_latency(self) -> float:
        """获取平均延迟"""
        latency_metrics = [
            m for m in self.result.metrics if m.name == "response_time"
        ]
        if latency_metrics:
            return latency_metrics[0].value
        return 100  # 默认值
    
    def _estimate_cost(self, instances: int) -> Dict:
        """估算成本"""
        # 简化的成本估算
        cost_per_instance = 100  # 每实例每月成本
        return {
            "monthly": instances * cost_per_instance,
            "annual": instances * cost_per_instance * 12
        }

# 自动扩展策略
class AutoScalingPolicy:
    def __init__(self):
        self.policies = []
    
    def add_metric_policy(self, metric_name: str, 
                         scale_up_threshold: float,
                         scale_down_threshold: float,
                         cooldown_seconds: int = 300):
        """添加指标策略"""
        self.policies.append({
            "type": "metric",
            "metric": metric_name,
            "scale_up": scale_up_threshold,
            "scale_down": scale_down_threshold,
            "cooldown": cooldown_seconds
        })
    
    def add_schedule_policy(self, schedule: str, 
                           min_instances: int,
                           max_instances: int):
        """添加计划策略"""
        self.policies.append({
            "type": "schedule",
            "schedule": schedule,
            "min": min_instances,
            "max": max_instances
        })
    
    def evaluate(self, current_metrics: Dict, 
                current_instances: int) -> Dict:
        """评估是否需要扩展"""
        for policy in self.policies:
            if policy["type"] == "metric":
                metric_value = current_metrics.get(policy["metric"], 0)
                
                if metric_value > policy["scale_up"]:
                    return {
                        "action": "scale_up",
                        "reason": f"{policy['metric']}={metric_value} > {policy['scale_up']}",
                        "target": current_instances + 1
                    }
                elif metric_value < policy["scale_down"]:
                    return {
                        "action": "scale_down",
                        "reason": f"{policy['metric']}={metric_value} < {policy['scale_down']}",
                        "target": max(1, current_instances - 1)
                    }
        
        return {"action": "none", "reason": "No scaling needed"}

# 成本优化分析
class CostOptimizer:
    def __init__(self, infrastructure_costs: Dict):
        self.costs = infrastructure_costs
    
    def analyze_utilization(self, usage_data: Dict) -> Dict:
        """分析资源利用率"""
        analysis = {}
        
        for resource, usage in usage_data.items():
            avg_usage = statistics.mean(usage) if usage else 0
            analysis[resource] = {
                "avg_utilization": avg_usage,
                "recommendation": self._get_recommendation(avg_usage)
            }
        
        return analysis
    
    def _get_recommendation(self, utilization: float) -> str:
        if utilization < 30:
            return "考虑缩减资源"
        elif utilization < 70:
            return "利用率合理"
        else:
            return "考虑扩容"
    
    def calculate_savings(self, current: Dict, optimized: Dict) -> Dict:
        """计算节省成本"""
        total_current = sum(current.values())
        total_optimized = sum(optimized.values())
        
        return {
            "current_monthly": total_current,
            "optimized_monthly": total_optimized,
            "savings_monthly": total_current - total_optimized,
            "savings_percent": ((total_current - total_optimized) / total_current * 100) 
                             if total_current > 0 else 0
        }

持续性能监控

持续性能监控将性能测试集成到日常开发流程中,通过性能回归检测和告警,及早发现性能问题。

# 持续性能监控
from datetime import datetime, timedelta
from collections import deque

class PerformanceMonitor:
    def __init__(self, alert_thresholds: Dict = None):
        self.metrics_history = defaultdict(lambda: deque(maxlen=1000))
        self.alerts = []
        self.thresholds = alert_thresholds or {
            "response_time": {"warning": 500, "critical": 1000},
            "error_rate": {"warning": 1, "critical": 5},
            "throughput": {"warning_min": 100, "critical_min": 50}
        }
    
    def record_metric(self, name: str, value: float, 
                     timestamp: float = None):
        """记录指标"""
        timestamp = timestamp or time.time()
        self.metrics_history[name].append({
            "value": value,
            "timestamp": timestamp
        })
        
        # 检查告警
        self._check_alerts(name, value)
    
    def _check_alerts(self, name: str, value: float):
        """检查告警"""
        thresholds = self.thresholds.get(name, {})
        
        if "critical" in thresholds and value > thresholds["critical"]:
            self._create_alert(name, value, "critical")
        elif "warning" in thresholds and value > thresholds["warning"]:
            self._create_alert(name, value, "warning")
    
    def _create_alert(self, metric: str, value: float, severity: str):
        """创建告警"""
        alert = {
            "metric": metric,
            "value": value,
            "severity": severity,
            "timestamp": datetime.now().isoformat(),
            "message": f"{metric} = {value} exceeds {severity} threshold"
        }
        self.alerts.append(alert)
        print(f"Alert: {alert['message']}")
    
    def detect_regression(self, metric_name: str, 
                         window_size: int = 100) -> Dict:
        """检测性能回归"""
        history = list(self.metrics_history.get(metric_name, []))
        
        if len(history) < window_size * 2:
            return {"regression_detected": False, "reason": "insufficient_data"}
        
        # 分割为参考窗口和当前窗口
        reference = [h["value"] for h in history[:window_size]]
        current = [h["value"] for h in history[-window_size:]]
        
        ref_mean = statistics.mean(reference)
        curr_mean = statistics.mean(current)
        
        # 计算变化百分比
        change_percent = ((curr_mean - ref_mean) / ref_mean * 100) if ref_mean > 0 else 0
        
        regression_detected = change_percent > 20  # 20%退化阈值
        
        return {
            "regression_detected": regression_detected,
            "reference_mean": ref_mean,
            "current_mean": curr_mean,
            "change_percent": change_percent,
            "recommendation": "investigate" if regression_detected else "normal"
        }
    
    def get_dashboard(self) -> Dict:
        """获取监控面板"""
        dashboard = {
            "timestamp": datetime.now().isoformat(),
            "metrics": {},
            "alerts": self.alerts[-10:],  # 最近10条告警
            "health_score": 100
        }
        
        for name, history in self.metrics_history.items():
            if history:
                values = [h["value"] for h in history]
                dashboard["metrics"][name] = {
                    "current": values[-1],
                    "mean": statistics.mean(values),
                    "min": min(values),
                    "max": max(values),
                    "trend": self._calculate_trend(values)
                }
        
        # 计算健康分数
        alert_count = len([a for a in self.alerts 
                          if a["severity"] == "critical"])
        dashboard["health_score"] -= alert_count * 10
        
        return dashboard
    
    def _calculate_trend(self, values: List[float]) -> str:
        """计算趋势"""
        if len(values) < 2:
            return "stable"
        
        recent = values[-10:] if len(values) >= 10 else values
        if recent[-1] > recent[0] * 1.1:
            return "increasing"
        elif recent[-1] < recent[0] * 0.9:
            return "decreasing"
        return "stable"

# 性能回归测试
class PerformanceRegressionTest:
    def __init__(self, monitor: PerformanceMonitor):
        self.monitor = monitor
    
    def run_regression_suite(self, test_cases: List[Dict]) -> Dict:
        """运行性能回归测试套件"""
        results = []
        
        for test_case in test_cases:
            result = self._run_test(test_case)
            results.append(result)
        
        return {
            "total": len(results),
            "passed": sum(1 for r in results if r["status"] == "passed"),
            "regressions": [r for r in results if r["status"] == "regression"],
            "results": results
        }
    
    def _run_test(self, test_case: Dict) -> Dict:
        """运行单个测试"""
        metric_name = test_case["metric"]
        threshold = test_case.get("threshold", 20)
        
        regression = self.monitor.detect_regression(metric_name)
        
        return {
            "name": test_case["name"],
            "metric": metric_name,
            "status": "regression" if regression["regression_detected"] else "passed",
            "details": regression
        }