混沌测试架构:故障注入与韧性验证
混沌测试架构:故障注入与韧性验证
混沌工程概述
混沌工程是在生产环境中主动引入故障,验证系统韧性的实践。通过模拟真实世界的故障场景(网络延迟、服务宕机、资源耗尽),发现系统的脆弱点并改进容错能力。
# 混沌工程框架
from dataclasses import dataclass, field
from typing import List, Dict, Callable, Optional
from enum import Enum
import time
import random
class FaultType(Enum):
NETWORK = "network"
SERVICE = "service"
RESOURCE = "resource"
STATE = "state"
class Severity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class ChaosExperiment:
name: str
description: str
fault_type: FaultType
severity: Severity
target: str
duration: int # seconds
steady_state_hypothesis: str
rollback_strategy: str
@dataclass
class ExperimentResult:
experiment: ChaosExperiment
start_time: float
end_time: float
fault_injected: bool
system_resilient: bool
observations: List[Dict]
metrics: Dict[str, float]
class ChaosOrchestrator:
def __init__(self):
self.experiments: List[ChaosExperiment] = []
self.results: List[ExperimentResult] = []
self.fault_injectors: Dict[FaultType, Callable] = {}
def register_fault_injector(self, fault_type: FaultType,
injector: Callable):
"""注册故障注入器"""
self.fault_injectors[fault_type] = injector
def create_experiment(self, name: str, **kwargs) -> ChaosExperiment:
"""创建混沌实验"""
experiment = ChaosExperiment(name=name, **kwargs)
self.experiments.append(experiment)
return experiment
def run_experiment(self, experiment: ChaosExperiment) -> ExperimentResult:
"""运行混沌实验"""
print(f"\n{'='*60}")
print(f"Starting experiment: {experiment.name}")
print(f"{'='*60}")
start_time = time.time()
observations = []
try:
# 1. 验证稳态假设
print("Step 1: Verifying steady state hypothesis...")
steady_state = self._verify_steady_state(experiment)
observations.append({
"step": "steady_state_check",
"result": steady_state
})
if not steady_state:
raise Exception("Steady state hypothesis failed")
# 2. 注入故障
print(f"Step 2: Injecting {experiment.fault_type.value} fault...")
injector = self.fault_injectors.get(experiment.fault_type)
if injector:
injector(experiment)
observations.append({
"step": "fault_injection",
"fault_type": experiment.fault_type.value,
"target": experiment.target
})
# 3. 等待故障持续时间
print(f"Step 3: Waiting for {experiment.duration} seconds...")
time.sleep(min(experiment.duration, 5)) # 缩短测试时间
# 4. 验证系统韧性
print("Step 4: Verifying system resilience...")
resilient = self._verify_resilience(experiment)
observations.append({
"step": "resilience_check",
"result": resilient
})
# 5. 回滚故障
print("Step 5: Rolling back fault...")
self._rollback_fault(experiment)
end_time = time.time()
result = ExperimentResult(
experiment=experiment,
start_time=start_time,
end_time=end_time,
fault_injected=True,
system_resilient=resilient,
observations=observations,
metrics=self._collect_metrics(experiment)
)
self.results.append(result)
print(f"\nExperiment completed: {'PASSED' if resilient else 'FAILED'}")
return result
except Exception as e:
print(f"\nExperiment failed with error: {e}")
# 确保回滚
self._rollback_fault(experiment)
return ExperimentResult(
experiment=experiment,
start_time=start_time,
end_time=time.time(),
fault_injected=False,
system_resilient=False,
observations=observations + [{"error": str(e)}],
metrics={}
)
def _verify_steady_state(self, experiment: ChaosExperiment) -> bool:
"""验证稳态假设"""
# 简化:实际应检查系统指标
return True
def _verify_resilience(self, experiment: ChaosExperiment) -> bool:
"""验证系统韧性"""
# 简化:实际应检查系统是否恢复正常
return True
def _rollback_fault(self, experiment: ChaosExperiment):
"""回滚故障"""
print(f"Rolling back {experiment.fault_type.value} fault...")
def _collect_metrics(self, experiment: ChaosExperiment) -> Dict:
"""收集指标"""
return {
"error_rate": 0.01,
"latency_p99": 150,
"availability": 0.999
}
def generate_report(self) -> Dict:
"""生成实验报告"""
total = len(self.results)
passed = sum(1 for r in self.results if r.system_resilient)
return {
"summary": {
"total_experiments": total,
"passed": passed,
"failed": total - passed,
"pass_rate": passed / total if total > 0 else 0
},
"results": [
{
"name": r.experiment.name,
"status": "passed" if r.system_resilient else "failed",
"duration": r.end_time - r.start_time
}
for r in self.results
]
}
故障注入策略
故障注入是混沌工程的核心,需要覆盖各种故障场景。常见的故障类型包括:网络故障、服务故障、资源故障和状态故障。
# 故障注入器
class NetworkFaultInjector:
"""网络故障注入"""
def inject_latency(self, target: str, latency_ms: int,
duration: int):
"""注入网络延迟"""
print(f"Injecting {latency_ms}ms latency to {target} for {duration}s")
# 实际实现:使用tc或iptables
def inject_packet_loss(self, target: str, loss_percent: float,
duration: int):
"""注入丢包"""
print(f"Injecting {loss_percent}% packet loss to {target}")
def inject_partition(self, target_a: str, target_b: str,
duration: int):
"""注入网络分区"""
print(f"Creating network partition between {target_a} and {target_b}")
class ServiceFaultInjector:
"""服务故障注入"""
def kill_service(self, service_name: str):
"""杀死服务"""
print(f"Killing service: {service_name}")
# 实际实现:使用Docker stop或kill进程
def restart_service(self, service_name: str):
"""重启服务"""
print(f"Restarting service: {service_name}")
def inject_exception(self, service_name: str,
exception_type: str):
"""注入异常"""
print(f"Injecting {exception_type} exception to {service_name}")
class ResourceFaultInjector:
"""资源故障注入"""
def exhaust_cpu(self, target: str, duration: int):
"""耗尽CPU"""
print(f"Exhausting CPU on {target} for {duration}s")
def exhaust_memory(self, target: str, duration: int):
"""耗尽内存"""
print(f"Exhausting memory on {target} for {duration}s")
def fill_disk(self, target: str, fill_percent: float):
"""填满磁盘"""
print(f"Filling disk to {fill_percent}% on {target}")
class StateFaultInjector:
"""状态故障注入"""
def corrupt_database(self, database: str, table: str):
"""损坏数据库"""
print(f"Corrupting table {table} in {database}")
def reset_cache(self, cache_service: str):
"""重置缓存"""
print(f"Resetting cache: {cache_service}")
# 预定义故障场景
class FaultScenarioLibrary:
"""故障场景库"""
@staticmethod
def service_unavailable(service_name: str) -> ChaosExperiment:
return ChaosExperiment(
name=f"{service_name}_unavailable",
description=f"Simulate {service_name} unavailability",
fault_type=FaultType.SERVICE,
severity=Severity.HIGH,
target=service_name,
duration=60,
steady_state_hypothesis=f"{service_name} is healthy",
rollback_strategy="restart service"
)
@staticmethod
def network_degradation(target: str) -> ChaosExperiment:
return ChaosExperiment(
name=f"network_degradation_{target}",
description=f"Simulate network degradation to {target}",
fault_type=FaultType.NETWORK,
severity=Severity.MEDIUM,
target=target,
duration=120,
steady_state_hypothesis=f"Network to {target} is healthy",
rollback_strategy="remove latency rule"
)
@staticmethod
def memory_pressure(target: str) -> ChaosExperiment:
return ChaosExperiment(
name=f"memory_pressure_{target}",
description=f"Simulate memory pressure on {target}",
fault_type=FaultType.RESOURCE,
severity=Severity.HIGH,
target=target,
duration=300,
steady_state_hypothesis=f"{target} has sufficient memory",
rollback_strategy="release memory"
)
# 使用示例
scenario_lib = FaultScenarioLibrary()
orchestrator = ChaosOrchestrator()
# 注册故障注入器
orchestrator.register_fault_injector(
FaultType.NETWORK,
lambda exp: NetworkFaultInjector().inject_latency(exp.target, 200, exp.duration)
)
orchestrator.register_fault_injector(
FaultType.SERVICE,
lambda exp: ServiceFaultInjector().kill_service(exp.target)
)
# 创建并运行实验
experiment = scenario_lib.service_unavailable("user-service")
result = orchestrator.run_experiment(experiment)
韧性验证方法
韧性验证检查系统在故障发生时和恢复后的行为。关键指标包括:可用性、错误率、延迟和数据一致性。
# 韧性验证器
from typing import Dict, List
import statistics
class ResilienceVerifier:
def __init__(self):
self.baseline_metrics = {}
self.current_metrics = {}
def record_baseline(self, metrics: Dict):
"""记录基线指标"""
self.baseline_metrics = metrics
def record_during_fault(self, metrics: Dict):
"""记录故障期间指标"""
self.current_metrics = metrics
def verify_availability(self, threshold: float = 0.99) -> Dict:
"""验证可用性"""
baseline_avail = self.baseline_metrics.get("availability", 1.0)
current_avail = self.current_metrics.get("availability", 1.0)
# 允许短暂下降但需快速恢复
acceptable_drop = 0.05 # 5%
return {
"metric": "availability",
"baseline": baseline_avail,
"current": current_avail,
"passed": current_avail >= baseline_avail - acceptable_drop,
"threshold": threshold
}
def verify_error_rate(self, threshold: float = 0.05) -> Dict:
"""验证错误率"""
baseline_error_rate = self.baseline_metrics.get("error_rate", 0)
current_error_rate = self.current_metrics.get("error_rate", 0)
# 错误率不应显著增加
acceptable_increase = 0.1 # 10%
return {
"metric": "error_rate",
"baseline": baseline_error_rate,
"current": current_error_rate,
"passed": current_error_rate <= baseline_error_rate + acceptable_increase,
"threshold": threshold
}
def verify_latency(self, p99_threshold: float = 500) -> Dict:
"""验证延迟"""
baseline_p99 = self.baseline_metrics.get("latency_p99", 100)
current_p99 = self.current_metrics.get("latency_p99", 100)
# 允许延迟增加但不应超过阈值
return {
"metric": "latency_p99",
"baseline": baseline_p99,
"current": current_p99,
"passed": current_p99 <= p99_threshold,
"threshold": p99_threshold
}
def verify_data_consistency(self) -> Dict:
"""验证数据一致性"""
# 检查数据是否丢失或损坏
return {
"metric": "data_consistency",
"passed": True, # 简化实现
"details": "No data loss detected"
}
def verify_recovery_time(self, max_recovery_seconds: int = 60) -> Dict:
"""验证恢复时间"""
# 检查系统恢复正常所需时间
recovery_time = self.current_metrics.get("recovery_time_seconds", 0)
return {
"metric": "recovery_time",
"recovery_seconds": recovery_time,
"passed": recovery_time <= max_recovery_seconds,
"threshold": max_recovery_seconds
}
def run_all_checks(self) -> Dict:
"""运行所有验证检查"""
checks = [
self.verify_availability(),
self.verify_error_rate(),
self.verify_latency(),
self.verify_data_consistency(),
self.verify_recovery_time()
]
all_passed = all(check["passed"] for check in checks)
return {
"all_passed": all_passed,
"checks": checks,
"summary": self._generate_summary(checks)
}
def _generate_summary(self, checks: List[Dict]) -> str:
"""生成摘要"""
passed = sum(1 for c in checks if c["passed"])
total = len(checks)
if passed == total:
return f"All {total} checks passed. System is resilient."
else:
failed = [c["metric"] for c in checks if not c["passed"]]
return f"{total - passed} checks failed: {', '.join(failed)}"
# 混沌实验监控
class ChaosExperimentMonitor:
"""混沌实验监控"""
def __init__(self):
self.metrics = {}
self.alerts = []
def start_monitoring(self, experiment: ChaosExperiment):
"""开始监控"""
print(f"Starting monitoring for experiment: {experiment.name}")
def collect_metrics(self) -> Dict:
"""收集指标"""
return {
"cpu_usage": random.uniform(20, 80),
"memory_usage": random.uniform(40, 70),
"request_rate": random.uniform(100, 200),
"error_rate": random.uniform(0, 0.05),
"latency_p99": random.uniform(50, 200)
}
def check_alerts(self, metrics: Dict) -> List[Dict]:
"""检查告警"""
alerts = []
if metrics.get("error_rate", 0) > 0.1:
alerts.append({
"severity": "critical",
"message": "High error rate detected"
})
if metrics.get("cpu_usage", 0) > 90:
alerts.append({
"severity": "warning",
"message": "High CPU usage"
})
return alerts
def stop_monitoring(self):
"""停止监控"""
print("Stopping monitoring...")
混沌工程平台
混沌工程平台提供可视化的实验管理、自动化执行和结果分析。支持计划实验、安全控制和集成CI/CD。
# 混沌工程平台
from datetime import datetime
from typing import List, Dict
class ChaosPlatform:
def __init__(self):
self.experiments = {}
self.schedules = {}
self.safety_rules = []
def create_experiment(self, name: str, config: Dict) -> str:
"""创建实验"""
experiment_id = f"exp_{datetime.now().strftime('%Y%m%d%H%M%S')}"
self.experiments[experiment_id] = {
"name": name,
"config": config,
"status": "created",
"created_at": datetime.now()
}
return experiment_id
def schedule_experiment(self, experiment_id: str,
cron_expression: str):
"""计划实验"""
self.schedules[experiment_id] = {
"cron": cron_expression,
"enabled": True
}
def add_safety_rule(self, rule: Dict):
"""添加安全规则"""
self.safety_rules.append(rule)
def check_safety(self, experiment: Dict) -> Dict:
"""检查安全性"""
violations = []
for rule in self.safety_rules:
if not self._evaluate_rule(rule, experiment):
violations.append(rule)
return {
"safe": len(violations) == 0,
"violations": violations
}
def _evaluate_rule(self, rule: Dict, experiment: Dict) -> bool:
"""评估规则"""
# 简化的规则评估
return True
def run_experiment(self, experiment_id: str) -> Dict:
"""运行实验"""
experiment = self.experiments.get(experiment_id)
if not experiment:
return {"error": "Experiment not found"}
# 检查安全性
safety_check = self.check_safety(experiment)
if not safety_check["safe"]:
return {"error": "Safety check failed", "violations": safety_check["violations"]}
# 运行实验
experiment["status"] = "running"
print(f"Running experiment: {experiment['name']}")
# 模拟执行
experiment["status"] = "completed"
return {
"status": "completed",
"experiment_id": experiment_id
}
def get_experiment_history(self) -> List[Dict]:
"""获取实验历史"""
return [
{
"id": exp_id,
"name": exp["name"],
"status": exp["status"],
"created_at": exp["created_at"].isoformat()
}
for exp_id, exp in self.experiments.items()
]
# 安全控制
class SafetyController:
"""安全控制器"""
def __init__(self):
self.rules = []
self.blast_radius_limit = 0.3 # 30%
def add_rule(self, rule_type: str, config: Dict):
"""添加规则"""
self.rules.append({"type": rule_type, "config": config})
def check_blast_radius(self, target: str,
total_services: int) -> bool:
"""检查爆炸半径"""
# 简化:假设每次只影响一个服务
affected_ratio = 1 / total_services
return affected_ratio <= self.blast_radius_limit
def check_maintenance_window(self) -> bool:
"""检查维护窗口"""
# 简化:检查是否在允许的时间窗口
return True
def validate_experiment(self, experiment: Dict) -> Dict:
"""验证实验"""
checks = []
# 检查爆炸半径
if "target" in experiment:
radius_ok = self.check_blast_radius(
experiment["target"],
experiment.get("total_services", 10)
)
checks.append({
"check": "blast_radius",
"passed": radius_ok
})
# 检查维护窗口
window_ok = self.check_maintenance_window()
checks.append({
"check": "maintenance_window",
"passed": window_ok
})
all_passed = all(c["passed"] for c in checks)
return {
"validated": all_passed,
"checks": checks
}
# 混沌实验报告
class ChaosExperimentReport:
def __init__(self, results: List[ExperimentResult]):
self.results = results
def generate_executive_summary(self) -> str:
"""生成执行摘要"""
total = len(self.results)
passed = sum(1 for r in self.results if r.system_resilient)
return f"""
# Chaos Engineering Executive Summary
## Overview
- Total Experiments: {total}
- Passed: {passed}
- Failed: {total - passed}
- Success Rate: {passed/total*100:.1f}%
## Key Findings
1. System demonstrated resilience to most fault scenarios
2. Recovery time within acceptable limits
3. Some areas for improvement identified
## Recommendations
1. Implement circuit breakers for identified weak points
2. Add retry logic with exponential backoff
3. Improve monitoring and alerting
"""
def generate_detailed_report(self) -> Dict:
"""生成详细报告"""
return {
"summary": {
"total": len(self.results),
"passed": sum(1 for r in self.results if r.system_resilient),
"failed": sum(1 for r in self.results if not r.system_resilient)
},
"experiments": [
{
"name": r.experiment.name,
"type": r.experiment.fault_type.value,
"severity": r.experiment.severity.value,
"result": "passed" if r.system_resilient else "failed",
"duration": r.end_time - r.start_time,
"observations": r.observations
}
for r in self.results
],
"recommendations": self._generate_recommendations()
}
def _generate_recommendations(self) -> List[str]:
"""生成改进建议"""
recommendations = []
for result in self.results:
if not result.system_resilient:
if result.experiment.fault_type == FaultType.NETWORK:
recommendations.append("Implement retry with exponential backoff")
elif result.experiment.fault_type == FaultType.SERVICE:
recommendations.append("Add circuit breaker pattern")
elif result.experiment.fault_type == FaultType.RESOURCE:
recommendations.append("Implement resource limits and auto-scaling")
return list(set(recommendations)) # 去重