← 返回首页
🧠

LLM事件响应

📂 llm ⏱ 11 min 2153 words

--- title: "LLM事件响应" description: "全面介绍LLM事件响应流程,包括应急预案、根因分析、事后复原等核心环节和最佳实践" tags: ["事件响应", "应急预案", "根因分析", "事后复原"] category: "llm" icon: "🧠"

LLM事件响应

LLM事件响应概述

LLM事件响应是指在大语言模型系统出现异常、故障或安全事件时,组织采取的一系列系统性应对措施。有效的事件响应能够最小化业务影响,快速恢复服务,并从事件中学习改进。

事件响应框架

from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from enum import Enum
import json
import threading

class IncidentSeverity(Enum):
    """事件严重程度"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class IncidentStatus(Enum):
    """事件状态"""
    DETECTED = "detected"
    INVESTIGATING = "investigating"
    IDENTIFIED = "identified"
    CONTAINING = "containing"
    ERADICATING = "eradicating"
    RECOVERING = "recovering"
    RESOLVED = "resolved"
    POST_INCIDENT = "post_incident"

class LLMIncidentResponse:
    """LLM事件响应系统"""
    def __init__(self):
        self.incidents = []
        self.response_teams = []
        self.runbooks = []
        self.communication_templates = {}
        self.lessons_learned = []
        self.lock = threading.Lock()
    
    def create_incident(self, incident_config: Dict[str, Any]) -> str:
        """创建事件"""
        incident_id = f"INC-{datetime.now().strftime('%Y%m%d')}-{len(self.incidents) + 1:04d}"
        
        incident = {
            "id": incident_id,
            "title": incident_config.get("title", ""),
            "description": incident_config.get("description", ""),
            "severity": incident_config.get("severity", IncidentSeverity.MEDIUM),
            "status": IncidentStatus.DETECTED,
            "impact": incident_config.get("impact", {}),
            "affected_services": incident_config.get("affected_services", []),
            "detected_at": datetime.now().isoformat(),
            "acknowledged_at": None,
            "resolved_at": None,
            "assigned_to": None,
            "response_team": None,
            "timeline": [],
            "actions_taken": [],
            "root_cause": None,
            "resolution": None,
            "lessons_learned": None
        }
        
        with self.lock:
            self.incidents.append(incident)
        
        # 自动分配团队
        self._auto_assign_team(incident)
        
        # 记录时间线
        self._add_timeline_entry(incident_id, "Incident created")
        
        return incident_id
    
    def acknowledge_incident(self, incident_id: str, user: str) -> bool:
        """确认事件"""
        incident = self._get_incident(incident_id)
        if not incident:
            return False
        
        with self.lock:
            incident["status"] = IncidentStatus.INVESTIGATING
            incident["acknowledged_at"] = datetime.now().isoformat()
            incident["assigned_to"] = user
        
        self._add_timeline_entry(incident_id, f"Incident acknowledged by {user}")
        
        return True
    
    def update_incident(self, incident_id: str, updates: Dict[str, Any]) -> bool:
        """更新事件"""
        incident = self._get_incident(incident_id)
        if not incident:
            return False
        
        with self.lock:
            for key, value in updates.items():
                if key in incident:
                    incident[key] = value
        
        self._add_timeline_entry(incident_id, f"Incident updated: {list(updates.keys())}")
        
        return True
    
    def resolve_incident(self, incident_id: str, resolution: Dict[str, Any]) -> bool:
        """解决事件"""
        incident = self._get_incident(incident_id)
        if not incident:
            return False
        
        with self.lock:
            incident["status"] = IncidentStatus.RESOLVED
            incident["resolved_at"] = datetime.now().isoformat()
            incident["resolution"] = resolution
        
        self._add_timeline_entry(incident_id, "Incident resolved")
        
        # 触发事后复盘
        self._trigger_post_incident_review(incident_id)
        
        return True
    
    def _auto_assign_team(self, incident: Dict):
        """自动分配团队"""
        affected_services = incident.get("affected_services", [])
        
        for team in self.response_teams:
            team_services = team.get("services", [])
            if any(service in team_services for service in affected_services):
                incident["response_team"] = team["id"]
                break
    
    def _add_timeline_entry(self, incident_id: str, entry: str):
        """添加时间线条目"""
        incident = self._get_incident(incident_id)
        if incident:
            timeline_entry = {
                "timestamp": datetime.now().isoformat(),
                "entry": entry
            }
            incident["timeline"].append(timeline_entry)
    
    def _get_incident(self, incident_id: str) -> Optional[Dict]:
        """获取事件"""
        for incident in self.incidents:
            if incident["id"] == incident_id:
                return incident
        return None
    
    def _trigger_post_incident_review(self, incident_id: str):
        """触发事后复盘"""
        incident = self._get_incident(incident_id)
        if incident:
            review = self._conduct_post_incident_review(incident)
            self.lessons_learned.append(review)
    
    def _conduct_post_incident_review(self, incident: Dict) -> Dict:
        """进行事后复盘"""
        return {
            "incident_id": incident["id"],
            "title": incident["title"],
            "severity": incident["severity"],
            "duration": self._calculate_duration(incident),
            "timeline": incident["timeline"],
            "root_cause": incident.get("root_cause"),
            "resolution": incident.get("resolution"),
            "lessons_learned": [],
            "action_items": [],
            "review_date": datetime.now().isoformat()
        }
    
    def _calculate_duration(self, incident: Dict) -> str:
        """计算持续时间"""
        detected_at = datetime.fromisoformat(incident["detected_at"])
        resolved_at = incident.get("resolved_at")
        
        if resolved_at:
            resolved_at = datetime.fromisoformat(resolved_at)
            duration = resolved_at - detected_at
            return str(duration)
        else:
            return "Ongoing"
    
    def get_incident_status(self, incident_id: str) -> Dict[str, Any]:
        """获取事件状态"""
        incident = self._get_incident(incident_id)
        if not incident:
            return {"error": "Incident not found"}
        
        return {
            "id": incident["id"],
            "title": incident["title"],
            "severity": incident["severity"],
            "status": incident["status"],
            "detected_at": incident["detected_at"],
            "acknowledged_at": incident.get("acknowledged_at"),
            "resolved_at": incident.get("resolved_at"),
            "assigned_to": incident.get("assigned_to"),
            "response_team": incident.get("response_team"),
            "timeline": incident["timeline"]
        }
    
    def get_active_incidents(self) -> List[Dict]:
        """获取活跃事件"""
        return [
            incident for incident in self.incidents
            if incident["status"] not in [IncidentStatus.RESOLVED, IncidentStatus.POST_INCIDENT]
        ]
    
    def get_incident_metrics(self) -> Dict[str, Any]:
        """获取事件指标"""
        total_incidents = len(self.incidents)
        resolved_incidents = sum(1 for i in self.incidents if i["status"] == IncidentStatus.RESOLVED)
        
        severity_counts = {}
        for incident in self.incidents:
            severity = incident["severity"]
            severity_counts[severity] = severity_counts.get(severity, 0) + 1
        
        return {
            "total_incidents": total_incidents,
            "resolved_incidents": resolved_incidents,
            "resolution_rate": resolved_incidents / total_incidents if total_incidents > 0 else 0,
            "severity_distribution": severity_counts,
            "average_resolution_time": self._calculate_average_resolution_time()
        }
    
    def _calculate_average_resolution_time(self) -> str:
        """计算平均解决时间"""
        resolved_incidents = [
            i for i in self.incidents
            if i["status"] == IncidentStatus.RESOLVED and i.get("resolved_at")
        ]
        
        if not resolved_incidents:
            return "N/A"
        
        total_duration = timedelta()
        for incident in resolved_incidents:
            detected = datetime.fromisoformat(incident["detected_at"])
            resolved = datetime.fromisoformat(incident["resolved_at"])
            total_duration += resolved - detected
        
        average = total_duration / len(resolved_incidents)
        return str(average)

应急预案

预案管理器

class IncidentResponsePlan:
    """事件响应预案"""
    def __init__(self):
        self.plans = []
        self.procedures = []
        self.checklists = []
    
    def create_response_plan(self, plan_config: Dict[str, Any]) -> str:
        """创建响应预案"""
        plan_id = f"PLAN-{len(self.plans) + 1}"
        
        plan = {
            "id": plan_id,
            "name": plan_config.get("name", ""),
            "description": plan_config.get("description", ""),
            "scope": plan_config.get("scope", []),
            "triggers": plan_config.get("triggers", []),
            "roles": plan_config.get("roles", []),
            "steps": plan_config.get("steps", []),
            "communication_plan": plan_config.get("communication_plan", {}),
            "escalation_procedures": plan_config.get("escalation_procedures", []),
            "resources": plan_config.get("resources", []),
            "created_at": datetime.now().isoformat(),
            "last_tested": None,
            "version": "1.0"
        }
        
        self.plans.append(plan)
        return plan_id
    
    def execute_plan(self, plan_id: str, incident: Dict) -> Dict[str, Any]:
        """执行预案"""
        plan = self._get_plan(plan_id)
        if not plan:
            return {"error": "Plan not found"}
        
        execution = {
            "plan_id": plan_id,
            "incident_id": incident.get("id", ""),
            "started_at": datetime.now().isoformat(),
            "steps_completed": [],
            "steps_failed": [],
            "status": "running"
        }
        
        for step in plan.get("steps", []):
            try:
                result = self._execute_step(step, incident)
                execution["steps_completed"].append({
                    "step": step,
                    "result": result,
                    "completed_at": datetime.now().isoformat()
                })
            except Exception as e:
                execution["steps_failed"].append({
                    "step": step,
                    "error": str(e),
                    "failed_at": datetime.now().isoformat()
                })
                execution["status"] = "failed"
                break
        
        if execution["status"] != "failed":
            execution["status"] = "completed"
        
        execution["completed_at"] = datetime.now().isoformat()
        
        return execution
    
    def _get_plan(self, plan_id: str) -> Optional[Dict]:
        """获取预案"""
        for plan in self.plans:
            if plan["id"] == plan_id:
                return plan
        return None
    
    def _execute_step(self, step: Dict, incident: Dict) -> Any:
        """执行步骤"""
        step_type = step.get("type", "")
        
        if step_type == "assess":
            return self._execute_assess_step(step, incident)
        elif step_type == "communicate":
            return self._execute_communicate_step(step, incident)
        elif step_type == "contain":
            return self._execute_contain_step(step, incident)
        elif step_type == "remediate":
            return self._execute_remediate_step(step, incident)
        elif step_type == "verify":
            return self._execute_verify_step(step, incident)
        else:
            return {"status": "skipped", "reason": f"Unknown step type: {step_type}"}
    
    def _execute_assess_step(self, step: Dict, incident: Dict) -> Dict:
        """执行评估步骤"""
        return {
            "status": "completed",
            "assessment": {
                "impact": incident.get("impact", {}),
                "affected_services": incident.get("affected_services", []),
                "severity": incident.get("severity", "")
            }
        }
    
    def _execute_communicate_step(self, step: Dict, incident: Dict) -> Dict:
        """执行通信步骤"""
        recipients = step.get("recipients", [])
        message_template = step.get("message_template", "")
        
        # 发送通信
        return {
            "status": "completed",
            "recipients_notified": len(recipients),
            "message_sent": message_template.format(incident=incident)
        }
    
    def _execute_contain_step(self, step: Dict, incident: Dict) -> Dict:
        """执行遏制步骤"""
        containment_actions = step.get("actions", [])
        
        # 执行遏制措施
        return {
            "status": "completed",
            "actions_executed": containment_actions
        }
    
    def _execute_remediate_step(self, step: Dict, incident: Dict) -> Dict:
        """执行修复步骤"""
        remediation_plan = step.get("remediation_plan", "")
        
        # 执行修复
        return {
            "status": "completed",
            "remediation_plan": remediation_plan
        }
    
    def _execute_verify_step(self, step: Dict, incident: Dict) -> Dict:
        """执行验证步骤"""
        verification_checks = step.get("checks", [])
        
        # 执行验证
        return {
            "status": "completed",
            "checks_passed": len(verification_checks),
            "all_passed": True
        }
    
    def test_plan(self, plan_id: str) -> Dict[str, Any]:
        """测试预案"""
        plan = self._get_plan(plan_id)
        if not plan:
            return {"error": "Plan not found"}
        
        # 模拟执行
        test_incident = {
            "id": "TEST-001",
            "title": "Test Incident",
            "severity": IncidentSeverity.LOW
        }
        
        execution = self.execute_plan(plan_id, test_incident)
        
        # 更新最后测试时间
        plan["last_tested"] = datetime.now().isoformat()
        
        return {
            "plan_id": plan_id,
            "test_result": execution,
            "tested_at": datetime.now().isoformat(),
            "recommendations": self._generate_test_recommendations(execution)
        }
    
    def _generate_test_recommendations(self, execution: Dict) -> List[str]:
        """生成测试建议"""
        recommendations = []
        
        if execution.get("status") == "failed":
            recommendations.append("预案执行失败,需要检查步骤配置")
        
        failed_steps = execution.get("steps_failed", [])
        if failed_steps:
            recommendations.append(f"有 {len(failed_steps)} 个步骤失败,需要优化")
        
        return recommendations
    
    def update_plan(self, plan_id: str, updates: Dict[str, Any]) -> bool:
        """更新预案"""
        plan = self._get_plan(plan_id)
        if not plan:
            return False
        
        plan.update(updates)
        plan["version"] = self._increment_version(plan.get("version", "1.0"))
        
        return True
    
    def _increment_version(self, version: str) -> str:
        """递增版本号"""
        parts = version.split(".")
        if len(parts) == 2:
            major, minor = parts
            return f"{major}.{int(minor) + 1}"
        return "1.1"

根因分析

根因分析器

class RootCauseAnalyzer:
    """根因分析器"""
    def __init__(self):
        self.analysis_methods = ["5_whys", "fishbone", "fault_tree"]
        self.knowledge_base = []
    
    def analyze_root_cause(self, incident: Dict, method: str = "5_whys") -> Dict[str, Any]:
        """分析根因"""
        if method not in self.analysis_methods:
            return {"error": f"Unknown method: {method}"}
        
        if method == "5_whys":
            return self._five_whys_analysis(incident)
        elif method == "fishbone":
            return self._fishbone_analysis(incident)
        elif method == "fault_tree":
            return self._fault_tree_analysis(incident)
        
        return {}
    
    def _five_whys_analysis(self, incident: Dict) -> Dict[str, Any]:
        """5个为什么分析"""
        analysis = {
            "method": "5_whys",
            "incident_id": incident.get("id", ""),
            "whys": [],
            "root_cause": None,
            "contributing_factors": [],
            "recommendations": []
        }
        
        # 模拟5个为什么分析
        whys = [
            {
                "why": 1,
                "question": "为什么系统出现了故障?",
                "answer": "模型推理服务不可用",
                "evidence": "健康检查失败"
            },
            {
                "why": 2,
                "question": "为什么模型推理服务不可用?",
                "answer": "GPU内存溢出",
                "evidence": "内存使用率100%"
            },
            {
                "why": 3,
                "question": "为什么GPU内存会溢出?",
                "answer": "请求量突增",
                "evidence": "请求量增加300%"
            },
            {
                "why": 4,
                "question": "为什么请求量会突增?",
                "answer": "营销活动导致流量激增",
                "evidence": "活动时间与流量激增时间吻合"
            },
            {
                "why": 5,
                "question": "为什么没有预见到流量激增?",
                "answer": "缺乏容量规划和监控",
                "evidence": "没有自动扩缩容策略"
            }
        ]
        
        analysis["whys"] = whys
        analysis["root_cause"] = "缺乏容量规划和监控机制"
        analysis["contributing_factors"] = [
            "营销活动沟通不畅",
            "自动扩缩容策略缺失",
            "监控告警阈值设置不当"
        ]
        analysis["recommendations"] = [
            "建立容量规划流程",
            "实施自动扩缩容",
            "优化监控告警策略",
            "加强跨部门沟通"
        ]
        
        return analysis
    
    def _fishbone_analysis(self, incident: Dict) -> Dict[str, Any]:
        """鱼骨图分析"""
        analysis = {
            "method": "fishbone",
            "incident_id": incident.get("id", ""),
            "categories": {
                "people": [],
                "process": [],
                "technology": [],
                "environment": [],
                "measurement": [],
                "materials": []
            },
            "root_cause": None
        }
        
        # 模拟鱼骨图分析
        analysis["categories"]["people"] = [
            "运维人员经验不足",
            "值班响应不及时"
        ]
        
        analysis["categories"]["process"] = [
            "变更管理流程缺失",
            "容量规划流程不完善"
        ]
        
        analysis["categories"]["technology"] = [
            "监控系统覆盖不全",
            "自动扩缩容机制缺失"
        ]
        
        analysis["categories"]["environment"] = [
            "生产环境配置不一致",
            "网络延迟影响"
        ]
        
        analysis["categories"]["measurement"] = [
            "性能基线未建立",
            "告警阈值设置不当"
        ]
        
        analysis["categories"]["materials"] = [
            "文档不完整",
            "工具链不统一"
        ]
        
        analysis["root_cause"] = "流程和技术机制不完善"
        
        return analysis
    
    def _fault_tree_analysis(self, incident: Dict) -> Dict[str, Any]:
        """故障树分析"""
        analysis = {
            "method": "fault_tree",
            "incident_id": incident.get("id", ""),
            "top_event": "系统故障",
            "gate_type": "AND",
            "basic_events": [],
            "intermediate_events": [],
            "minimal_cut_sets": []
        }
        
        # 模拟故障树分析
        analysis["intermediate_events"] = [
            {
                "event": "服务不可用",
                "gate_type": "OR",
                "inputs": ["GPU故障", "内存溢出", "网络中断"]
            },
            {
                "event": "资源耗尽",
                "gate_type": "AND",
                "inputs": ["高负载", "资源限制"]
            }
        ]
        
        analysis["basic_events"] = [
            "GPU硬件故障",
            "内存泄漏",
            "网络设备故障",
            "流量突增",
            "配置错误",
            "软件缺陷"
        ]
        
        analysis["minimal_cut_sets"] = [
            ["GPU硬件故障"],
            ["内存泄漏", "高负载"],
            ["网络设备故障"],
            ["流量突增", "资源限制"]
        ]
        
        analysis["root_cause"] = "多因素共同作用导致系统故障"
        
        return analysis
    
    def correlate_events(self, incidents: List[Dict]) -> List[Dict]:
        """关联事件"""
        correlations = []
        
        # 时间关联
        time_correlations = self._correlate_by_time(incidents)
        correlations.extend(time_correlations)
        
        # 服务关联
        service_correlations = self._correlate_by_service(incidents)
        correlations.extend(service_correlations)
        
        # 根因关联
        root_cause_correlations = self._correlate_by_root_cause(incidents)
        correlations.extend(root_cause_correlations)
        
        return correlations
    
    def _correlate_by_time(self, incidents: List[Dict]) -> List[Dict]:
        """按时间关联"""
        correlations = []
        
        # 按时间排序
        sorted_incidents = sorted(incidents, key=lambda x: x.get("detected_at", ""))
        
        for i in range(len(sorted_incidents) - 1):
            current = sorted_incidents[i]
            next_incident = sorted_incidents[i + 1]
            
            current_time = datetime.fromisoformat(current.get("detected_at", ""))
            next_time = datetime.fromisoformat(next_incident.get("detected_at", ""))
            
            time_diff = next_time - current_time
            
            if time_diff < timedelta(minutes=30):
                correlations.append({
                    "type": "time_correlation",
                    "incidents": [current["id"], next_incident["id"]],
                    "time_difference": str(time_diff),
                    "strength": "strong" if time_diff < timedelta(minutes=5) else "medium"
                })
        
        return correlations
    
    def _correlate_by_service(self, incidents: List[Dict]) -> List[Dict]:
        """按服务关联"""
        correlations = []
        service_incidents = {}
        
        for incident in incidents:
            for service in incident.get("affected_services", []):
                if service not in service_incidents:
                    service_incidents[service] = []
                service_incidents[service].append(incident["id"])
        
        for service, incident_ids in service_incidents.items():
            if len(incident_ids) > 1:
                correlations.append({
                    "type": "service_correlation",
                    "service": service,
                    "incidents": incident_ids,
                    "count": len(incident_ids)
                })
        
        return correlations
    
    def _correlate_by_root_cause(self, incidents: List[Dict]) -> List[Dict]:
        """按根因关联"""
        correlations = []
        root_cause_incidents = {}
        
        for incident in incidents:
            root_cause = incident.get("root_cause")
            if root_cause:
                if root_cause not in root_cause_incidents:
                    root_cause_incidents[root_cause] = []
                root_cause_incidents[root_cause].append(incident["id"])
        
        for root_cause, incident_ids in root_cause_incidents.items():
            if len(incident_ids) > 1:
                correlations.append({
                    "type": "root_cause_correlation",
                    "root_cause": root_cause,
                    "incidents": incident_ids,
                    "count": len(incident_ids)
                })
        
        return correlations

事后复原

复原管理器

class PostIncidentRecovery:
    """事后复原"""
    def __init__(self):
        self.recovery_plans = []
        self.recovery_actions = []
        self.verification_checks = []
    
    def create_recovery_plan(self, incident: Dict) -> str:
        """创建复原计划"""
        plan_id = f"REC-{incident.get('id', 'UNKNOWN')}"
        
        recovery_plan = {
            "id": plan_id,
            "incident_id": incident.get("id", ""),
            "recovery_objectives": self._define_recovery_objectives(incident),
            "recovery_steps": self._define_recovery_steps(incident),
            "verification_criteria": self._define_verification_criteria(incident),
            "rollback_procedures": self._define_rollback_procedures(incident),
            "communication_plan": self._define_communication_plan(incident),
            "created_at": datetime.now().isoformat(),
            "status": "created"
        }
        
        self.recovery_plans.append(recovery_plan)
        return plan_id
    
    def _define_recovery_objectives(self, incident: Dict) -> Dict[str, Any]:
        """定义复原目标"""
        return {
            "rto": "4 hours",  # 恢复时间目标
            "rpo": "1 hour",   # 恢复点目标
            "service_level": "99.9%",
            "data_integrity": "100%"
        }
    
    def _define_recovery_steps(self, incident: Dict) -> List[Dict]:
        """定义复原步骤"""
        steps = [
            {
                "order": 1,
                "action": "验证根本原因已解决",
                "owner": "运维团队",
                "estimated_time": "30 minutes",
                "dependencies": []
            },
            {
                "order": 2,
                "action": "恢复服务配置",
                "owner": "开发团队",
                "estimated_time": "1 hour",
                "dependencies": ["step_1"]
            },
            {
                "order": 3,
                "action": "执行数据完整性检查",
                "owner": "数据团队",
                "estimated_time": "2 hours",
                "dependencies": ["step_2"]
            },
            {
                "order": 4,
                "action": "进行功能验证测试",
                "owner": "QA团队",
                "estimated_time": "1 hour",
                "dependencies": ["step_3"]
            },
            {
                "order": 5,
                "action": "逐步恢复流量",
                "owner": "运维团队",
                "estimated_time": "30 minutes",
                "dependencies": ["step_4"]
            }
        ]
        
        return steps
    
    def _define_verification_criteria(self, incident: Dict) -> List[str]:
        """定义验证标准"""
        return [
            "所有健康检查通过",
            "性能指标恢复正常",
            "错误率低于阈值",
            "用户反馈正常",
            "监控告警解除"
        ]
    
    def _define_rollback_procedures(self, incident: Dict) -> List[Dict]:
        """定义回滚程序"""
        return [
            {
                "trigger": "恢复失败",
                "action": "回滚到上一个稳定版本",
                "steps": [
                    "停止新版本服务",
                    "恢复旧版本配置",
                    "重启服务",
                    "验证回滚成功"
                ]
            },
            {
                "trigger": "数据不一致",
                "action": "从备份恢复数据",
                "steps": [
                    "识别数据不一致范围",
                    "选择恢复点",
                    "执行数据恢复",
                    "验证数据完整性"
                ]
            }
        ]
    
    def _define_communication_plan(self, incident: Dict) -> Dict[str, Any]:
        """定义通信计划"""
        return {
            "internal_communication": {
                "team_notifications": True,
                "management_updates": "Every 30 minutes",
                "status_page_updates": "Every 15 minutes"
            },
            "external_communication": {
                "customer_notifications": True,
                "partner_notifications": True,
                "public_updates": "Every hour"
            }
        }
    
    def execute_recovery(self, plan_id: str) -> Dict[str, Any]:
        """执行复原"""
        plan = self._get_recovery_plan(plan_id)
        if not plan:
            return {"error": "Recovery plan not found"}
        
        execution = {
            "plan_id": plan_id,
            "started_at": datetime.now().isoformat(),
            "steps_completed": [],
            "steps_failed": [],
            "status": "running"
        }
        
        for step in plan.get("recovery_steps", []):
            try:
                result = self._execute_recovery_step(step)
                execution["steps_completed"].append({
                    "step": step,
                    "result": result,
                    "completed_at": datetime.now().isoformat()
                })
            except Exception as e:
                execution["steps_failed"].append({
                    "step": step,
                    "error": str(e),
                    "failed_at": datetime.now().isoformat()
                })
                execution["status"] = "failed"
                break
        
        if execution["status"] != "failed":
            execution["status"] = "completed"
        
        execution["completed_at"] = datetime.now().isoformat()
        
        return execution
    
    def _get_recovery_plan(self, plan_id: str) -> Optional[Dict]:
        """获取复原计划"""
        for plan in self.recovery_plans:
            if plan["id"] == plan_id:
                return plan
        return None
    
    def _execute_recovery_step(self, step: Dict) -> Dict[str, Any]:
        """执行复原步骤"""
        # 模拟执行
        return {
            "status": "success",
            "action": step.get("action", ""),
            "completed_at": datetime.now().isoformat()
        }
    
    def verify_recovery(self, plan_id: str) -> Dict[str, Any]:
        """验证复原"""
        plan = self._get_recovery_plan(plan_id)
        if not plan:
            return {"error": "Recovery plan not found"}
        
        verification_results = []
        
        for criterion in plan.get("verification_criteria", []):
            result = self._verify_criterion(criterion)
            verification_results.append({
                "criterion": criterion,
                "passed": result,
                "verified_at": datetime.now().isoformat()
            })
        
        all_passed = all(r["passed"] for r in verification_results)
        
        return {
            "plan_id": plan_id,
            "verification_results": verification_results,
            "all_passed": all_passed,
            "verified_at": datetime.now().isoformat()
        }
    
    def _verify_criterion(self, criterion: str) -> bool:
        """验证标准"""
        # 模拟验证
        return True
    
    def generate_recovery_report(self, plan_id: str) -> Dict[str, Any]:
        """生成复原报告"""
        plan = self._get_recovery_plan(plan_id)
        if not plan:
            return {"error": "Recovery plan not found"}
        
        report = {
            "incident_id": plan.get("incident_id", ""),
            "recovery_plan_id": plan_id,
            "recovery_objectives": plan.get("recovery_objectives", {}),
            "recovery_steps": plan.get("recovery_steps", []),
            "verification_results": self.verify_recovery(plan_id),
            "recovery_duration": self._calculate_recovery_duration(plan),
            "lessons_learned": self._extract_lessons_learned(plan),
            "recommendations": self._generate_recommendations(plan)
        }
        
        return report
    
    def _calculate_recovery_duration(self, plan: Dict) -> str:
        """计算复原持续时间"""
        # 简化实现
        return "2 hours 30 minutes"
    
    def _extract_lessons_learned(self, plan: Dict) -> List[str]:
        """提取经验教训"""
        return [
            "建立更完善的监控机制",
            "优化应急预案",
            "加强团队培训",
            "改进沟通流程"
        ]
    
    def _generate_recommendations(self, plan: Dict) -> List[Dict]:
        """生成建议"""
        return [
            {
                "category": "prevention",
                "recommendation": "加强容量规划和监控",
                "priority": "high",
                "timeline": "1 month"
            },
            {
                "category": "process",
                "recommendation": "优化事件响应流程",
                "priority": "medium",
                "timeline": "2 weeks"
            },
            {
                "category": "training",
                "recommendation": "加强团队技能培训",
                "priority": "medium",
                "timeline": "1 month"
            }
        ]

事件响应工具集成

工具管理器

class IncidentResponseTools:
    """事件响应工具"""
    def __init__(self):
        self.tools = {
            "monitoring": MonitoringTool(),
            "communication": CommunicationTool(),
            "documentation": DocumentationTool(),
            "automation": AutomationTool()
        }
    
    def get_monitoring_data(self, incident_id: str) -> Dict[str, Any]:
        """获取监控数据"""
        return self.tools["monitoring"].get_incident_metrics(incident_id)
    
    def send_notifications(self, incident: Dict, recipients: List[str]):
        """发送通知"""
        self.tools["communication"].send_incident_notifications(incident, recipients)
    
    def document_incident(self, incident: Dict) -> str:
        """记录事件"""
        return self.tools["documentation"].create_incident_report(incident)
    
    def automate_response(self, incident: Dict, actions: List[str]):
        """自动化响应"""
        return self.tools["automation"].execute_actions(incident, actions)

class MonitoringTool:
    """监控工具"""
    def get_incident_metrics(self, incident_id: str) -> Dict[str, Any]:
        """获取事件指标"""
        return {
            "incident_id": incident_id,
            "metrics": {},
            "logs": [],
            "traces": []
        }

class CommunicationTool:
    """通信工具"""
    def send_incident_notifications(self, incident: Dict, recipients: List[str]):
        """发送事件通知"""
        for recipient in recipients:
            self._send_notification(incident, recipient)
    
    def _send_notification(self, incident: Dict, recipient: str):
        """发送通知"""
        print(f"Sending notification to {recipient} about incident {incident.get('id', '')}")

class DocumentationTool:
    """文档工具"""
    def create_incident_report(self, incident: Dict) -> str:
        """创建事件报告"""
        return f"Incident Report for {incident.get('id', '')}"

class AutomationTool:
    """自动化工具"""
    def execute_actions(self, incident: Dict, actions: List[str]) -> Dict[str, Any]:
        """执行操作"""
        results = []
        for action in actions:
            result = self._execute_action(incident, action)
            results.append(result)
        return {"actions": results}
    
    def _execute_action(self, incident: Dict, action: str) -> Dict[str, Any]:
        """执行操作"""
        return {"action": action, "status": "success"}

总结

LLM事件响应是确保系统稳定性和可靠性的关键流程。通过建立完善的应急预案、实施系统化的根因分析和执行彻底的事后复原,组织可以快速应对各种事件,最小化业务影响,并从事件中持续改进。结合自动化工具和最佳实践,可以大幅提升事件响应效率和效果。