← 返回首页
🧪

混沌测试架构:故障注入与韧性验证

📂 architecture ⏱ 8 min 1430 words

混沌测试架构:故障注入与韧性验证

混沌工程概述

混沌工程是在生产环境中主动引入故障,验证系统韧性的实践。通过模拟真实世界的故障场景(网络延迟、服务宕机、资源耗尽),发现系统的脆弱点并改进容错能力。

# 混沌工程框架
from dataclasses import dataclass, field
from typing import List, Dict, Callable, Optional
from enum import Enum
import time
import random

class FaultType(Enum):
    NETWORK = "network"
    SERVICE = "service"
    RESOURCE = "resource"
    STATE = "state"

class Severity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class ChaosExperiment:
    name: str
    description: str
    fault_type: FaultType
    severity: Severity
    target: str
    duration: int  # seconds
    steady_state_hypothesis: str
    rollback_strategy: str

@dataclass
class ExperimentResult:
    experiment: ChaosExperiment
    start_time: float
    end_time: float
    fault_injected: bool
    system_resilient: bool
    observations: List[Dict]
    metrics: Dict[str, float]

class ChaosOrchestrator:
    def __init__(self):
        self.experiments: List[ChaosExperiment] = []
        self.results: List[ExperimentResult] = []
        self.fault_injectors: Dict[FaultType, Callable] = {}
    
    def register_fault_injector(self, fault_type: FaultType, 
                               injector: Callable):
        """注册故障注入器"""
        self.fault_injectors[fault_type] = injector
    
    def create_experiment(self, name: str, **kwargs) -> ChaosExperiment:
        """创建混沌实验"""
        experiment = ChaosExperiment(name=name, **kwargs)
        self.experiments.append(experiment)
        return experiment
    
    def run_experiment(self, experiment: ChaosExperiment) -> ExperimentResult:
        """运行混沌实验"""
        print(f"\n{'='*60}")
        print(f"Starting experiment: {experiment.name}")
        print(f"{'='*60}")
        
        start_time = time.time()
        observations = []
        
        try:
            # 1. 验证稳态假设
            print("Step 1: Verifying steady state hypothesis...")
            steady_state = self._verify_steady_state(experiment)
            observations.append({
                "step": "steady_state_check",
                "result": steady_state
            })
            
            if not steady_state:
                raise Exception("Steady state hypothesis failed")
            
            # 2. 注入故障
            print(f"Step 2: Injecting {experiment.fault_type.value} fault...")
            injector = self.fault_injectors.get(experiment.fault_type)
            if injector:
                injector(experiment)
            
            observations.append({
                "step": "fault_injection",
                "fault_type": experiment.fault_type.value,
                "target": experiment.target
            })
            
            # 3. 等待故障持续时间
            print(f"Step 3: Waiting for {experiment.duration} seconds...")
            time.sleep(min(experiment.duration, 5))  # 缩短测试时间
            
            # 4. 验证系统韧性
            print("Step 4: Verifying system resilience...")
            resilient = self._verify_resilience(experiment)
            observations.append({
                "step": "resilience_check",
                "result": resilient
            })
            
            # 5. 回滚故障
            print("Step 5: Rolling back fault...")
            self._rollback_fault(experiment)
            
            end_time = time.time()
            
            result = ExperimentResult(
                experiment=experiment,
                start_time=start_time,
                end_time=end_time,
                fault_injected=True,
                system_resilient=resilient,
                observations=observations,
                metrics=self._collect_metrics(experiment)
            )
            
            self.results.append(result)
            
            print(f"\nExperiment completed: {'PASSED' if resilient else 'FAILED'}")
            return result
        
        except Exception as e:
            print(f"\nExperiment failed with error: {e}")
            
            # 确保回滚
            self._rollback_fault(experiment)
            
            return ExperimentResult(
                experiment=experiment,
                start_time=start_time,
                end_time=time.time(),
                fault_injected=False,
                system_resilient=False,
                observations=observations + [{"error": str(e)}],
                metrics={}
            )
    
    def _verify_steady_state(self, experiment: ChaosExperiment) -> bool:
        """验证稳态假设"""
        # 简化:实际应检查系统指标
        return True
    
    def _verify_resilience(self, experiment: ChaosExperiment) -> bool:
        """验证系统韧性"""
        # 简化:实际应检查系统是否恢复正常
        return True
    
    def _rollback_fault(self, experiment: ChaosExperiment):
        """回滚故障"""
        print(f"Rolling back {experiment.fault_type.value} fault...")
    
    def _collect_metrics(self, experiment: ChaosExperiment) -> Dict:
        """收集指标"""
        return {
            "error_rate": 0.01,
            "latency_p99": 150,
            "availability": 0.999
        }
    
    def generate_report(self) -> Dict:
        """生成实验报告"""
        total = len(self.results)
        passed = sum(1 for r in self.results if r.system_resilient)
        
        return {
            "summary": {
                "total_experiments": total,
                "passed": passed,
                "failed": total - passed,
                "pass_rate": passed / total if total > 0 else 0
            },
            "results": [
                {
                    "name": r.experiment.name,
                    "status": "passed" if r.system_resilient else "failed",
                    "duration": r.end_time - r.start_time
                }
                for r in self.results
            ]
        }

故障注入策略

故障注入是混沌工程的核心,需要覆盖各种故障场景。常见的故障类型包括:网络故障、服务故障、资源故障和状态故障。

# 故障注入器
class NetworkFaultInjector:
    """网络故障注入"""
    
    def inject_latency(self, target: str, latency_ms: int, 
                      duration: int):
        """注入网络延迟"""
        print(f"Injecting {latency_ms}ms latency to {target} for {duration}s")
        # 实际实现:使用tc或iptables
    
    def inject_packet_loss(self, target: str, loss_percent: float,
                          duration: int):
        """注入丢包"""
        print(f"Injecting {loss_percent}% packet loss to {target}")
    
    def inject_partition(self, target_a: str, target_b: str,
                        duration: int):
        """注入网络分区"""
        print(f"Creating network partition between {target_a} and {target_b}")

class ServiceFaultInjector:
    """服务故障注入"""
    
    def kill_service(self, service_name: str):
        """杀死服务"""
        print(f"Killing service: {service_name}")
        # 实际实现:使用Docker stop或kill进程
    
    def restart_service(self, service_name: str):
        """重启服务"""
        print(f"Restarting service: {service_name}")
    
    def inject_exception(self, service_name: str, 
                        exception_type: str):
        """注入异常"""
        print(f"Injecting {exception_type} exception to {service_name}")

class ResourceFaultInjector:
    """资源故障注入"""
    
    def exhaust_cpu(self, target: str, duration: int):
        """耗尽CPU"""
        print(f"Exhausting CPU on {target} for {duration}s")
    
    def exhaust_memory(self, target: str, duration: int):
        """耗尽内存"""
        print(f"Exhausting memory on {target} for {duration}s")
    
    def fill_disk(self, target: str, fill_percent: float):
        """填满磁盘"""
        print(f"Filling disk to {fill_percent}% on {target}")

class StateFaultInjector:
    """状态故障注入"""
    
    def corrupt_database(self, database: str, table: str):
        """损坏数据库"""
        print(f"Corrupting table {table} in {database}")
    
    def reset_cache(self, cache_service: str):
        """重置缓存"""
        print(f"Resetting cache: {cache_service}")

# 预定义故障场景
class FaultScenarioLibrary:
    """故障场景库"""
    
    @staticmethod
    def service_unavailable(service_name: str) -> ChaosExperiment:
        return ChaosExperiment(
            name=f"{service_name}_unavailable",
            description=f"Simulate {service_name} unavailability",
            fault_type=FaultType.SERVICE,
            severity=Severity.HIGH,
            target=service_name,
            duration=60,
            steady_state_hypothesis=f"{service_name} is healthy",
            rollback_strategy="restart service"
        )
    
    @staticmethod
    def network_degradation(target: str) -> ChaosExperiment:
        return ChaosExperiment(
            name=f"network_degradation_{target}",
            description=f"Simulate network degradation to {target}",
            fault_type=FaultType.NETWORK,
            severity=Severity.MEDIUM,
            target=target,
            duration=120,
            steady_state_hypothesis=f"Network to {target} is healthy",
            rollback_strategy="remove latency rule"
        )
    
    @staticmethod
    def memory_pressure(target: str) -> ChaosExperiment:
        return ChaosExperiment(
            name=f"memory_pressure_{target}",
            description=f"Simulate memory pressure on {target}",
            fault_type=FaultType.RESOURCE,
            severity=Severity.HIGH,
            target=target,
            duration=300,
            steady_state_hypothesis=f"{target} has sufficient memory",
            rollback_strategy="release memory"
        )

# 使用示例
scenario_lib = FaultScenarioLibrary()
orchestrator = ChaosOrchestrator()

# 注册故障注入器
orchestrator.register_fault_injector(
    FaultType.NETWORK,
    lambda exp: NetworkFaultInjector().inject_latency(exp.target, 200, exp.duration)
)
orchestrator.register_fault_injector(
    FaultType.SERVICE,
    lambda exp: ServiceFaultInjector().kill_service(exp.target)
)

# 创建并运行实验
experiment = scenario_lib.service_unavailable("user-service")
result = orchestrator.run_experiment(experiment)

韧性验证方法

韧性验证检查系统在故障发生时和恢复后的行为。关键指标包括:可用性、错误率、延迟和数据一致性。

# 韧性验证器
from typing import Dict, List
import statistics

class ResilienceVerifier:
    def __init__(self):
        self.baseline_metrics = {}
        self.current_metrics = {}
    
    def record_baseline(self, metrics: Dict):
        """记录基线指标"""
        self.baseline_metrics = metrics
    
    def record_during_fault(self, metrics: Dict):
        """记录故障期间指标"""
        self.current_metrics = metrics
    
    def verify_availability(self, threshold: float = 0.99) -> Dict:
        """验证可用性"""
        baseline_avail = self.baseline_metrics.get("availability", 1.0)
        current_avail = self.current_metrics.get("availability", 1.0)
        
        # 允许短暂下降但需快速恢复
        acceptable_drop = 0.05  # 5%
        
        return {
            "metric": "availability",
            "baseline": baseline_avail,
            "current": current_avail,
            "passed": current_avail >= baseline_avail - acceptable_drop,
            "threshold": threshold
        }
    
    def verify_error_rate(self, threshold: float = 0.05) -> Dict:
        """验证错误率"""
        baseline_error_rate = self.baseline_metrics.get("error_rate", 0)
        current_error_rate = self.current_metrics.get("error_rate", 0)
        
        # 错误率不应显著增加
        acceptable_increase = 0.1  # 10%
        
        return {
            "metric": "error_rate",
            "baseline": baseline_error_rate,
            "current": current_error_rate,
            "passed": current_error_rate <= baseline_error_rate + acceptable_increase,
            "threshold": threshold
        }
    
    def verify_latency(self, p99_threshold: float = 500) -> Dict:
        """验证延迟"""
        baseline_p99 = self.baseline_metrics.get("latency_p99", 100)
        current_p99 = self.current_metrics.get("latency_p99", 100)
        
        # 允许延迟增加但不应超过阈值
        return {
            "metric": "latency_p99",
            "baseline": baseline_p99,
            "current": current_p99,
            "passed": current_p99 <= p99_threshold,
            "threshold": p99_threshold
        }
    
    def verify_data_consistency(self) -> Dict:
        """验证数据一致性"""
        # 检查数据是否丢失或损坏
        return {
            "metric": "data_consistency",
            "passed": True,  # 简化实现
            "details": "No data loss detected"
        }
    
    def verify_recovery_time(self, max_recovery_seconds: int = 60) -> Dict:
        """验证恢复时间"""
        # 检查系统恢复正常所需时间
        recovery_time = self.current_metrics.get("recovery_time_seconds", 0)
        
        return {
            "metric": "recovery_time",
            "recovery_seconds": recovery_time,
            "passed": recovery_time <= max_recovery_seconds,
            "threshold": max_recovery_seconds
        }
    
    def run_all_checks(self) -> Dict:
        """运行所有验证检查"""
        checks = [
            self.verify_availability(),
            self.verify_error_rate(),
            self.verify_latency(),
            self.verify_data_consistency(),
            self.verify_recovery_time()
        ]
        
        all_passed = all(check["passed"] for check in checks)
        
        return {
            "all_passed": all_passed,
            "checks": checks,
            "summary": self._generate_summary(checks)
        }
    
    def _generate_summary(self, checks: List[Dict]) -> str:
        """生成摘要"""
        passed = sum(1 for c in checks if c["passed"])
        total = len(checks)
        
        if passed == total:
            return f"All {total} checks passed. System is resilient."
        else:
            failed = [c["metric"] for c in checks if not c["passed"]]
            return f"{total - passed} checks failed: {', '.join(failed)}"

# 混沌实验监控
class ChaosExperimentMonitor:
    """混沌实验监控"""
    
    def __init__(self):
        self.metrics = {}
        self.alerts = []
    
    def start_monitoring(self, experiment: ChaosExperiment):
        """开始监控"""
        print(f"Starting monitoring for experiment: {experiment.name}")
    
    def collect_metrics(self) -> Dict:
        """收集指标"""
        return {
            "cpu_usage": random.uniform(20, 80),
            "memory_usage": random.uniform(40, 70),
            "request_rate": random.uniform(100, 200),
            "error_rate": random.uniform(0, 0.05),
            "latency_p99": random.uniform(50, 200)
        }
    
    def check_alerts(self, metrics: Dict) -> List[Dict]:
        """检查告警"""
        alerts = []
        
        if metrics.get("error_rate", 0) > 0.1:
            alerts.append({
                "severity": "critical",
                "message": "High error rate detected"
            })
        
        if metrics.get("cpu_usage", 0) > 90:
            alerts.append({
                "severity": "warning",
                "message": "High CPU usage"
            })
        
        return alerts
    
    def stop_monitoring(self):
        """停止监控"""
        print("Stopping monitoring...")

混沌工程平台

混沌工程平台提供可视化的实验管理、自动化执行和结果分析。支持计划实验、安全控制和集成CI/CD。

# 混沌工程平台
from datetime import datetime
from typing import List, Dict

class ChaosPlatform:
    def __init__(self):
        self.experiments = {}
        self.schedules = {}
        self.safety_rules = []
    
    def create_experiment(self, name: str, config: Dict) -> str:
        """创建实验"""
        experiment_id = f"exp_{datetime.now().strftime('%Y%m%d%H%M%S')}"
        
        self.experiments[experiment_id] = {
            "name": name,
            "config": config,
            "status": "created",
            "created_at": datetime.now()
        }
        
        return experiment_id
    
    def schedule_experiment(self, experiment_id: str, 
                           cron_expression: str):
        """计划实验"""
        self.schedules[experiment_id] = {
            "cron": cron_expression,
            "enabled": True
        }
    
    def add_safety_rule(self, rule: Dict):
        """添加安全规则"""
        self.safety_rules.append(rule)
    
    def check_safety(self, experiment: Dict) -> Dict:
        """检查安全性"""
        violations = []
        
        for rule in self.safety_rules:
            if not self._evaluate_rule(rule, experiment):
                violations.append(rule)
        
        return {
            "safe": len(violations) == 0,
            "violations": violations
        }
    
    def _evaluate_rule(self, rule: Dict, experiment: Dict) -> bool:
        """评估规则"""
        # 简化的规则评估
        return True
    
    def run_experiment(self, experiment_id: str) -> Dict:
        """运行实验"""
        experiment = self.experiments.get(experiment_id)
        if not experiment:
            return {"error": "Experiment not found"}
        
        # 检查安全性
        safety_check = self.check_safety(experiment)
        if not safety_check["safe"]:
            return {"error": "Safety check failed", "violations": safety_check["violations"]}
        
        # 运行实验
        experiment["status"] = "running"
        print(f"Running experiment: {experiment['name']}")
        
        # 模拟执行
        experiment["status"] = "completed"
        
        return {
            "status": "completed",
            "experiment_id": experiment_id
        }
    
    def get_experiment_history(self) -> List[Dict]:
        """获取实验历史"""
        return [
            {
                "id": exp_id,
                "name": exp["name"],
                "status": exp["status"],
                "created_at": exp["created_at"].isoformat()
            }
            for exp_id, exp in self.experiments.items()
        ]

# 安全控制
class SafetyController:
    """安全控制器"""
    
    def __init__(self):
        self.rules = []
        self.blast_radius_limit = 0.3  # 30%
    
    def add_rule(self, rule_type: str, config: Dict):
        """添加规则"""
        self.rules.append({"type": rule_type, "config": config})
    
    def check_blast_radius(self, target: str, 
                          total_services: int) -> bool:
        """检查爆炸半径"""
        # 简化:假设每次只影响一个服务
        affected_ratio = 1 / total_services
        return affected_ratio <= self.blast_radius_limit
    
    def check_maintenance_window(self) -> bool:
        """检查维护窗口"""
        # 简化:检查是否在允许的时间窗口
        return True
    
    def validate_experiment(self, experiment: Dict) -> Dict:
        """验证实验"""
        checks = []
        
        # 检查爆炸半径
        if "target" in experiment:
            radius_ok = self.check_blast_radius(
                experiment["target"],
                experiment.get("total_services", 10)
            )
            checks.append({
                "check": "blast_radius",
                "passed": radius_ok
            })
        
        # 检查维护窗口
        window_ok = self.check_maintenance_window()
        checks.append({
            "check": "maintenance_window",
            "passed": window_ok
        })
        
        all_passed = all(c["passed"] for c in checks)
        
        return {
            "validated": all_passed,
            "checks": checks
        }

# 混沌实验报告
class ChaosExperimentReport:
    def __init__(self, results: List[ExperimentResult]):
        self.results = results
    
    def generate_executive_summary(self) -> str:
        """生成执行摘要"""
        total = len(self.results)
        passed = sum(1 for r in self.results if r.system_resilient)
        
        return f"""
# Chaos Engineering Executive Summary

## Overview
- Total Experiments: {total}
- Passed: {passed}
- Failed: {total - passed}
- Success Rate: {passed/total*100:.1f}%

## Key Findings
1. System demonstrated resilience to most fault scenarios
2. Recovery time within acceptable limits
3. Some areas for improvement identified

## Recommendations
1. Implement circuit breakers for identified weak points
2. Add retry logic with exponential backoff
3. Improve monitoring and alerting
"""
    
    def generate_detailed_report(self) -> Dict:
        """生成详细报告"""
        return {
            "summary": {
                "total": len(self.results),
                "passed": sum(1 for r in self.results if r.system_resilient),
                "failed": sum(1 for r in self.results if not r.system_resilient)
            },
            "experiments": [
                {
                    "name": r.experiment.name,
                    "type": r.experiment.fault_type.value,
                    "severity": r.experiment.severity.value,
                    "result": "passed" if r.system_resilient else "failed",
                    "duration": r.end_time - r.start_time,
                    "observations": r.observations
                }
                for r in self.results
            ],
            "recommendations": self._generate_recommendations()
        }
    
    def _generate_recommendations(self) -> List[str]:
        """生成改进建议"""
        recommendations = []
        
        for result in self.results:
            if not result.system_resilient:
                if result.experiment.fault_type == FaultType.NETWORK:
                    recommendations.append("Implement retry with exponential backoff")
                elif result.experiment.fault_type == FaultType.SERVICE:
                    recommendations.append("Add circuit breaker pattern")
                elif result.experiment.fault_type == FaultType.RESOURCE:
                    recommendations.append("Implement resource limits and auto-scaling")
        
        return list(set(recommendations))  # 去重