← 返回首页
🧠

LLM告警

📂 llm ⏱ 11 min 2147 words

--- title: "LLM告警" description: "全面介绍LLM告警系统设计,包括告警规则、告警路由、告警响应等核心功能实现" tags: ["LLM告警", "告警规则", "告警路由", "告警响应"] category: "llm" icon: "🧠"

LLM告警

LLM告警系统概述

LLM告警系统是监控大语言模型运行状态、性能指标和异常情况的关键组件。当模型出现性能下降、资源异常或安全问题时,系统能够及时通知相关人员采取行动。

告警系统架构

from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable
from enum import Enum
import json
import threading

class AlertSeverity(Enum):
    """告警严重程度"""
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"
    EMERGENCY = "emergency"

class AlertStatus(Enum):
    """告警状态"""
    PENDING = "pending"
    FIRING = "firing"
    RESOLVED = "resolved"
    ACKNOWLEDGED = "acknowledged"
    SILENCED = "silenced"

class LLMAlertingSystem:
    """LLM告警系统"""
    def __init__(self):
        self.alert_rules = []
        self.active_alerts = []
        self.alert_history = []
        self.notification_channels = []
        self.escalation_policies = []
        self.silence_rules = []
        self.lock = threading.Lock()
    
    def add_alert_rule(self, rule_config: Dict[str, Any]) -> str:
        """添加告警规则"""
        rule_id = f"rule_{len(self.alert_rules) + 1}"
        
        rule = {
            "id": rule_id,
            "name": rule_config.get("name", ""),
            "description": rule_config.get("description", ""),
            "metric": rule_config.get("metric", ""),
            "condition": rule_config.get("condition", {}),
            "severity": rule_config.get("severity", AlertSeverity.WARNING),
            "duration": rule_config.get("duration", "5m"),
            "labels": rule_config.get("labels", {}),
            "annotations": rule_config.get("annotations", {}),
            "enabled": rule_config.get("enabled", True),
            "created_at": datetime.now().isoformat()
        }
        
        self.alert_rules.append(rule)
        return rule_id
    
    def evaluate_rules(self, metrics: Dict[str, Any]) -> List[Dict]:
        """评估告警规则"""
        new_alerts = []
        
        for rule in self.alert_rules:
            if not rule.get("enabled", True):
                continue
            
            if self._evaluate_condition(rule, metrics):
                alert = self._create_alert(rule, metrics)
                
                if not self._is_silenced(alert):
                    new_alerts.append(alert)
                    self._process_alert(alert)
        
        return new_alerts
    
    def _evaluate_condition(self, rule: Dict, metrics: Dict) -> bool:
        """评估条件"""
        condition = rule.get("condition", {})
        metric_name = rule.get("metric", "")
        
        if metric_name not in metrics:
            return False
        
        metric_value = metrics[metric_name]
        operator = condition.get("operator", "gt")
        threshold = condition.get("threshold", 0)
        
        if operator == "gt":
            return metric_value > threshold
        elif operator == "lt":
            return metric_value < threshold
        elif operator == "gte":
            return metric_value >= threshold
        elif operator == "lte":
            return metric_value <= threshold
        elif operator == "eq":
            return metric_value == threshold
        elif operator == "ne":
            return metric_value != threshold
        
        return False
    
    def _create_alert(self, rule: Dict, metrics: Dict) -> Dict:
        """创建告警"""
        alert_id = f"alert_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{rule['id']}"
        
        alert = {
            "id": alert_id,
            "rule_id": rule["id"],
            "rule_name": rule["name"],
            "severity": rule["severity"],
            "status": AlertStatus.FIRING,
            "labels": rule.get("labels", {}),
            "annotations": rule.get("annotations", {}),
            "metric_name": rule["metric"],
            "metric_value": metrics.get(rule["metric"], 0),
            "condition": rule["condition"],
            "fired_at": datetime.now().isoformat(),
            "resolved_at": None,
            "acknowledged_at": None,
            "acknowledged_by": None,
            "description": rule.get("description", "")
        }
        
        return alert
    
    def _is_silenced(self, alert: Dict) -> bool:
        """检查是否被静默"""
        for silence_rule in self.silence_rules:
            if self._matches_silence(alert, silence_rule):
                return True
        return False
    
    def _matches_silence(self, alert: Dict, silence_rule: Dict) -> bool:
        """检查是否匹配静默规则"""
        # 检查标签匹配
        matchers = silence_rule.get("matchers", {})
        for key, value in matchers.items():
            if alert.get("labels", {}).get(key) != value:
                return False
        
        # 检查时间范围
        start = silence_rule.get("start", datetime.min)
        end = silence_rule.get("end", datetime.max)
        now = datetime.now()
        
        if start <= now <= end:
            return True
        
        return False
    
    def _process_alert(self, alert: Dict):
        """处理告警"""
        with self.lock:
            self.active_alerts.append(alert)
            self.alert_history.append(alert)
        
        # 发送通知
        self._send_notifications(alert)
        
        # 执行升级策略
        self._execute_escalation(alert)
    
    def _send_notifications(self, alert: Dict):
        """发送通知"""
        for channel in self.notification_channels:
            try:
                channel.send(alert)
            except Exception as e:
                print(f"Failed to send notification to {channel.name}: {e}")
    
    def _execute_escalation(self, alert: Dict):
        """执行升级策略"""
        for policy in self.escalation_policies:
            if self._matches_escalation(alert, policy):
                policy.execute(alert)
                break
    
    def _matches_escalation(self, alert: Dict, policy: Dict) -> bool:
        """检查是否匹配升级策略"""
        severity_match = alert["severity"] in policy.get("severities", [])
        label_match = self._match_labels(alert.get("labels", {}), policy.get("labels", {}))
        
        return severity_match and label_match
    
    def resolve_alert(self, alert_id: str) -> bool:
        """解决告警"""
        with self.lock:
            for alert in self.active_alerts:
                if alert["id"] == alert_id:
                    alert["status"] = AlertStatus.RESOLVED
                    alert["resolved_at"] = datetime.now().isoformat()
                    self.active_alerts.remove(alert)
                    return True
        return False
    
    def acknowledge_alert(self, alert_id: str, user: str) -> bool:
        """确认告警"""
        with self.lock:
            for alert in self.active_alerts:
                if alert["id"] == alert_id:
                    alert["status"] = AlertStatus.ACKNOWLEDGED
                    alert["acknowledged_at"] = datetime.now().isoformat()
                    alert["acknowledged_by"] = user
                    return True
        return False
    
    def silence_alert(self, alert_id: str, duration_minutes: int = 60) -> bool:
        """静默告警"""
        with self.lock:
            for alert in self.active_alerts:
                if alert["id"] == alert_id:
                    alert["status"] = AlertStatus.SILENCED
                    return True
        return False
    
    def get_active_alerts(self, severity: Optional[AlertSeverity] = None) -> List[Dict]:
        """获取活跃告警"""
        if severity:
            return [a for a in self.active_alerts if a["severity"] == severity]
        return self.active_alerts.copy()
    
    def get_alert_history(self, hours: int = 24) -> List[Dict]:
        """获取告警历史"""
        cutoff_time = datetime.now() - timedelta(hours=hours)
        return [
            a for a in self.alert_history
            if datetime.fromisoformat(a["fired_at"]) > cutoff_time
        ]
    
    def add_notification_channel(self, channel):
        """添加通知渠道"""
        self.notification_channels.append(channel)
    
    def add_escalation_policy(self, policy: Dict):
        """添加升级策略"""
        self.escalation_policies.append(policy)
    
    def add_silence_rule(self, rule: Dict):
        """添加静默规则"""
        self.silence_rules.append(rule)

告警规则定义

规则配置器

class AlertRuleConfigurator:
    """告警规则配置器"""
    def __init__(self):
        self.rule_templates = self._load_rule_templates()
    
    def _load_rule_templates(self) -> Dict[str, Dict]:
        """加载规则模板"""
        return {
            "high_latency": {
                "name": "高延迟告警",
                "description": "模型响应时间超过阈值",
                "metric": "latency",
                "condition": {"operator": "gt", "threshold": 1.0},
                "severity": AlertSeverity.WARNING,
                "duration": "5m",
                "labels": {"team": "ml-ops", "service": "llm"}
            },
            "high_error_rate": {
                "name": "高错误率告警",
                "description": "模型错误率超过阈值",
                "metric": "error_rate",
                "condition": {"operator": "gt", "threshold": 0.05},
                "severity": AlertSeverity.CRITICAL,
                "duration": "2m",
                "labels": {"team": "ml-ops", "service": "llm"}
            },
            "low_accuracy": {
                "name": "低准确率告警",
                "description": "模型准确率低于阈值",
                "metric": "accuracy",
                "condition": {"operator": "lt", "threshold": 0.9},
                "severity": AlertSeverity.WARNING,
                "duration": "10m",
                "labels": {"team": "ml-ops", "service": "llm"}
            },
            "high_memory_usage": {
                "name": "高内存使用告警",
                "description": "GPU内存使用率超过阈值",
                "metric": "gpu_memory_usage",
                "condition": {"operator": "gt", "threshold": 0.9},
                "severity": AlertSeverity.CRITICAL,
                "duration": "1m",
                "labels": {"team": "infra", "service": "gpu"}
            },
            "security_incident": {
                "name": "安全事件告警",
                "description": "检测到潜在安全事件",
                "metric": "security_score",
                "condition": {"operator": "lt", "threshold": 0.7},
                "severity": AlertSeverity.EMERGENCY,
                "duration": "0m",
                "labels": {"team": "security", "service": "llm"}
            }
        }
    
    def create_rule_from_template(self, template_name: str, 
                                 overrides: Dict = None) -> Dict:
        """从模板创建规则"""
        if template_name not in self.rule_templates:
            raise ValueError(f"Template {template_name} not found")
        
        rule = self.rule_templates[template_name].copy()
        
        if overrides:
            rule.update(overrides)
        
        return rule
    
    def create_custom_rule(self, config: Dict) -> Dict:
        """创建自定义规则"""
        required_fields = ["name", "metric", "condition"]
        for field in required_fields:
            if field not in config:
                raise ValueError(f"Missing required field: {field}")
        
        rule = {
            "name": config["name"],
            "description": config.get("description", ""),
            "metric": config["metric"],
            "condition": config["condition"],
            "severity": config.get("severity", AlertSeverity.WARNING),
            "duration": config.get("duration", "5m"),
            "labels": config.get("labels", {}),
            "annotations": config.get("annotations", {}),
            "enabled": config.get("enabled", True)
        }
        
        return rule
    
    def validate_rule(self, rule: Dict) -> Dict[str, Any]:
        """验证规则"""
        errors = []
        
        # 验证必填字段
        if not rule.get("name"):
            errors.append("Rule name is required")
        
        if not rule.get("metric"):
            errors.append("Metric name is required")
        
        if not rule.get("condition"):
            errors.append("Condition is required")
        
        # 验证条件格式
        condition = rule.get("condition", {})
        if "operator" not in condition:
            errors.append("Condition operator is required")
        if "threshold" not in condition:
            errors.append("Condition threshold is required")
        
        # 验证严重程度
        severity = rule.get("severity", "")
        if severity not in [s.value for s in AlertSeverity]:
            errors.append(f"Invalid severity: {severity}")
        
        return {
            "valid": len(errors) == 0,
            "errors": errors
        }
    
    def optimize_rule(self, rule: Dict) -> Dict:
        """优化规则"""
        optimized = rule.copy()
        
        # 根据指标类型优化阈值
        metric_name = rule.get("metric", "")
        condition = rule.get("condition", {})
        
        if metric_name == "latency":
            # 延迟告警:使用更宽松的阈值
            optimized["condition"]["threshold"] = condition.get("threshold", 1.0) * 1.2
            optimized["duration"] = "10m"
        
        elif metric_name == "error_rate":
            # 错误率告警:使用更严格的阈值
            optimized["condition"]["threshold"] = condition.get("threshold", 0.05) * 0.8
        
        elif metric_name == "accuracy":
            # 准确率告警:使用更严格的阈值
            optimized["condition"]["threshold"] = condition.get("threshold", 0.9) * 1.1
        
        return optimized

告警路由

路由管理器

class AlertRouter:
    """告警路由器"""
    def __init__(self):
        self.routing_rules = []
        self.route_cache = {}
    
    def add_routing_rule(self, rule_config: Dict) -> str:
        """添加路由规则"""
        rule_id = f"route_{len(self.routing_rules) + 1}"
        
        rule = {
            "id": rule_id,
            "name": rule_config.get("name", ""),
            "matchers": rule_config.get("matchers", {}),
            "routes": rule_config.get("routes", []),
            "priority": rule_config.get("priority", 0),
            "enabled": rule_config.get("enabled", True)
        }
        
        self.routing_rules.append(rule)
        
        # 按优先级排序
        self.routing_rules.sort(key=lambda x: x.get("priority", 0), reverse=True)
        
        return rule_id
    
    def route_alert(self, alert: Dict) -> List[Dict]:
        """路由告警"""
        routes = []
        
        for rule in self.routing_rules:
            if not rule.get("enabled", True):
                continue
            
            if self._matches_rule(alert, rule):
                routes.extend(rule.get("routes", []))
                
                # 如果是终止规则,停止匹配
                if rule.get("continue", True) is False:
                    break
        
        # 去重
        unique_routes = []
        seen = set()
        for route in routes:
            route_key = (route.get("channel", ""), str(route.get("recipients", [])))
            if route_key not in seen:
                unique_routes.append(route)
                seen.add(route_key)
        
        return unique_routes
    
    def _matches_rule(self, alert: Dict, rule: Dict) -> bool:
        """检查是否匹配规则"""
        matchers = rule.get("matchers", {})
        
        for key, value in matchers.items():
            # 支持正则表达式
            if isinstance(value, str) and value.startswith("regex:"):
                pattern = value[6:]
                import re
                if not re.match(pattern, alert.get("labels", {}).get(key, "")):
                    return False
            else:
                # 精确匹配
                if alert.get("labels", {}).get(key) != value:
                    return False
        
        return True
    
    def get_routes_for_alert(self, alert: Dict) -> List[Dict]:
        """获取告警的路由"""
        return self.route_alert(alert)
    
    def update_routing_rule(self, rule_id: str, updates: Dict) -> bool:
        """更新路由规则"""
        for rule in self.routing_rules:
            if rule["id"] == rule_id:
                rule.update(updates)
                return True
        return False
    
    def remove_routing_rule(self, rule_id: str) -> bool:
        """移除路由规则"""
        original_count = len(self.routing_rules)
        self.routing_rules = [r for r in self.routing_rules if r["id"] != rule_id]
        return len(self.routing_rules) < original_count
    
    def test_routing(self, alert: Dict) -> Dict[str, Any]:
        """测试路由"""
        routes = self.route_alert(alert)
        
        return {
            "alert": alert,
            "matched_routes": routes,
            "route_count": len(routes),
            "channels": [r.get("channel") for r in routes]
        }

class NotificationChannel:
    """通知渠道"""
    def __init__(self, channel_type: str, name: str, config: Dict):
        self.channel_type = channel_type
        self.name = name
        self.config = config
        self.enabled = True
    
    def send(self, alert: Dict) -> bool:
        """发送通知"""
        if not self.enabled:
            return False
        
        try:
            if self.channel_type == "email":
                return self._send_email(alert)
            elif self.channel_type == "slack":
                return self._send_slack(alert)
            elif self.channel_type == "webhook":
                return self._send_webhook(alert)
            elif self.channel_type == "sms":
                return self._send_sms(alert)
            else:
                return False
        except Exception as e:
            print(f"Failed to send notification: {e}")
            return False
    
    def _send_email(self, alert: Dict) -> bool:
        """发送邮件"""
        # 实现邮件发送逻辑
        print(f"Sending email alert: {alert.get('rule_name', '')}")
        return True
    
    def _send_slack(self, alert: Dict) -> bool:
        """发送Slack消息"""
        # 实现Slack发送逻辑
        print(f"Sending Slack alert: {alert.get('rule_name', '')}")
        return True
    
    def _send_webhook(self, alert: Dict) -> bool:
        """发送Webhook"""
        import requests
        
        url = self.config.get("url", "")
        payload = {
            "alert": alert,
            "timestamp": datetime.now().isoformat()
        }
        
        response = requests.post(url, json=payload, timeout=10)
        return response.status_code == 200
    
    def _send_sms(self, alert: Dict) -> bool:
        """发送短信"""
        # 实现短信发送逻辑
        print(f"Sending SMS alert: {alert.get('rule_name', '')}")
        return True

告警响应

响应管理器

class AlertResponseManager:
    """告警响应管理器"""
    def __init__(self):
        self.response_playbooks = []
        self.automation_rules = []
        self.escalation_policies = []
    
    def create_response_playbook(self, playbook_config: Dict) -> str:
        """创建响应手册"""
        playbook_id = f"playbook_{len(self.response_playbooks) + 1}"
        
        playbook = {
            "id": playbook_id,
            "name": playbook_config.get("name", ""),
            "description": playbook_config.get("description", ""),
            "trigger_conditions": playbook_config.get("trigger_conditions", {}),
            "steps": playbook_config.get("steps", []),
            "estimated_time": playbook_config.get("estimated_time", "30m"),
            "required_roles": playbook_config.get("required_roles", []),
            "created_at": datetime.now().isoformat()
        }
        
        self.response_playbooks.append(playbook)
        return playbook_id
    
    def execute_playbook(self, playbook_id: str, alert: Dict) -> Dict[str, Any]:
        """执行响应手册"""
        playbook = self._get_playbook(playbook_id)
        if not playbook:
            return {"success": False, "error": "Playbook not found"}
        
        execution = {
            "execution_id": f"exec_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
            "playbook_id": playbook_id,
            "alert_id": alert.get("id", ""),
            "started_at": datetime.now().isoformat(),
            "completed_at": None,
            "status": "running",
            "steps_completed": [],
            "steps_failed": []
        }
        
        for step in playbook.get("steps", []):
            try:
                result = self._execute_step(step, alert)
                execution["steps_completed"].append({
                    "step": step,
                    "result": result,
                    "completed_at": datetime.now().isoformat()
                })
            except Exception as e:
                execution["steps_failed"].append({
                    "step": step,
                    "error": str(e),
                    "failed_at": datetime.now().isoformat()
                })
                execution["status"] = "failed"
                break
        
        if execution["status"] != "failed":
            execution["status"] = "completed"
        
        execution["completed_at"] = datetime.now().isoformat()
        
        return execution
    
    def _get_playbook(self, playbook_id: str) -> Optional[Dict]:
        """获取手册"""
        for playbook in self.response_playbooks:
            if playbook["id"] == playbook_id:
                return playbook
        return None
    
    def _execute_step(self, step: Dict, alert: Dict) -> Any:
        """执行步骤"""
        step_type = step.get("type", "")
        
        if step_type == "notify":
            return self._execute_notify_step(step, alert)
        elif step_type == "investigate":
            return self._execute_investigate_step(step, alert)
        elif step_type == "remediate":
            return self._execute_remediate_step(step, alert)
        elif step_type == "verify":
            return self._execute_verify_step(step, alert)
        else:
            return {"status": "skipped", "reason": f"Unknown step type: {step_type}"}
    
    def _execute_notify_step(self, step: Dict, alert: Dict) -> Dict:
        """执行通知步骤"""
        recipients = step.get("recipients", [])
        message = step.get("message", f"Alert: {alert.get('rule_name', '')}")
        
        # 发送通知
        return {
            "status": "success",
            "recipients_notified": len(recipients),
            "message": message
        }
    
    def _execute_investigate_step(self, step: Dict, alert: Dict) -> Dict:
        """执行调查步骤"""
        investigation_type = step.get("investigation_type", "basic")
        
        # 执行调查
        return {
            "status": "success",
            "investigation_type": investigation_type,
            "findings": []
        }
    
    def _execute_remediate_step(self, step: Dict, alert: Dict) -> Dict:
        """执行修复步骤"""
        remediation_action = step.get("action", "")
        
        # 执行修复
        return {
            "status": "success",
            "action": remediation_action,
            "result": "Remediation applied successfully"
        }
    
    def _execute_verify_step(self, step: Dict, alert: Dict) -> Dict:
        """执行验证步骤"""
        verification_method = step.get("method", "")
        
        # 执行验证
        return {
            "status": "success",
            "method": verification_method,
            "verified": True
        }
    
    def add_escalation_policy(self, policy_config: Dict) -> str:
        """添加升级策略"""
        policy_id = f"escalation_{len(self.escalation_policies) + 1}"
        
        policy = {
            "id": policy_id,
            "name": policy_config.get("name", ""),
            "severities": policy_config.get("severities", []),
            "stages": policy_config.get("stages", []),
            "enabled": policy_config.get("enabled", True)
        }
        
        self.escalation_policies.append(policy)
        return policy_id
    
    def execute_escalation(self, alert: Dict) -> Dict[str, Any]:
        """执行升级"""
        for policy in self.escalation_policies:
            if not policy.get("enabled", True):
                continue
            
            if alert["severity"] in policy.get("severities", []):
                return self._run_escalation_stages(alert, policy["stages"])
        
        return {"status": "no_matching_policy"}
    
    def _run_escalation_stages(self, alert: Dict, stages: List[Dict]) -> Dict[str, Any]:
        """运行升级阶段"""
        execution = {
            "stages_executed": [],
            "current_stage": 0,
            "escalated": False
        }
        
        for i, stage in enumerate(stages):
            wait_time = stage.get("wait_time", "5m")
            recipients = stage.get("recipients", [])
            
            # 等待时间
            # 这里简化实现,实际应该异步等待
            
            # 执行通知
            stage_result = {
                "stage": i + 1,
                "wait_time": wait_time,
                "recipients": recipients,
                "executed_at": datetime.now().isoformat()
            }
            
            execution["stages_executed"].append(stage_result)
            execution["current_stage"] = i + 1
            
            # 如果这是最后一个阶段,标记为已升级
            if i == len(stages) - 1:
                execution["escalated"] = True
        
        return execution
    
    def auto_remediate(self, alert: Dict) -> Dict[str, Any]:
        """自动修复"""
        remediation_rules = self._get_remediation_rules(alert)
        
        results = []
        for rule in remediation_rules:
            try:
                result = self._apply_remediation(rule, alert)
                results.append(result)
            except Exception as e:
                results.append({
                    "rule": rule.get("name", ""),
                    "status": "failed",
                    "error": str(e)
                })
        
        return {
            "alert_id": alert.get("id", ""),
            "remediation_results": results,
            "timestamp": datetime.now().isoformat()
        }
    
    def _get_remediation_rules(self, alert: Dict) -> List[Dict]:
        """获取修复规则"""
        # 根据告警类型返回相应规则
        return []
    
    def _apply_remediation(self, rule: Dict, alert: Dict) -> Dict:
        """应用修复"""
        return {
            "rule": rule.get("name", ""),
            "status": "success",
            "applied_at": datetime.now().isoformat()
        }

告警分析与优化

告警分析器

class AlertAnalyzer:
    """告警分析器"""
    def __init__(self):
        self.alert_metrics = {}
    
    def analyze_alert_patterns(self, alerts: List[Dict]) -> Dict[str, Any]:
        """分析告警模式"""
        if not alerts:
            return {"patterns": [], "insights": []}
        
        patterns = []
        
        # 时间模式分析
        time_pattern = self._analyze_time_pattern(alerts)
        patterns.append(time_pattern)
        
        # 频率模式分析
        frequency_pattern = self._analyze_frequency_pattern(alerts)
        patterns.append(frequency_pattern)
        
        # 关联性分析
        correlation_pattern = self._analyze_correlation(alerts)
        patterns.append(correlation_pattern)
        
        return {
            "patterns": patterns,
            "insights": self._generate_insights(patterns),
            "recommendations": self._generate_recommendations(patterns)
        }
    
    def _analyze_time_pattern(self, alerts: List[Dict]) -> Dict:
        """分析时间模式"""
        hourly_distribution = {}
        
        for alert in alerts:
            fired_at = datetime.fromisoformat(alert.get("fired_at", ""))
            hour = fired_at.hour
            hourly_distribution[hour] = hourly_distribution.get(hour, 0) + 1
        
        # 找出高峰时段
        peak_hours = sorted(hourly_distribution.items(), key=lambda x: x[1], reverse=True)[:3]
        
        return {
            "type": "time_pattern",
            "hourly_distribution": hourly_distribution,
            "peak_hours": peak_hours,
            "insight": f"高峰时段: {', '.join([f'{h}点' for h, _ in peak_hours])}"
        }
    
    def _analyze_frequency_pattern(self, alerts: List[Dict]) -> Dict:
        """分析频率模式"""
        rule_frequencies = {}
        
        for alert in alerts:
            rule_id = alert.get("rule_id", "")
            rule_frequencies[rule_id] = rule_frequencies.get(rule_id, 0) + 1
        
        # 找出最频繁的规则
        top_rules = sorted(rule_frequencies.items(), key=lambda x: x[1], reverse=True)[:5]
        
        return {
            "type": "frequency_pattern",
            "rule_frequencies": rule_frequencies,
            "top_rules": top_rules,
            "insight": f"最频繁告警规则: {top_rules[0][0] if top_rules else '无'}"
        }
    
    def _analyze_correlation(self, alerts: List[Dict]) -> Dict:
        """分析相关性"""
        # 简化的相关性分析
        severity_correlation = {}
        
        for alert in alerts:
            severity = alert.get("severity", "")
            severity_correlation[severity] = severity_correlation.get(severity, 0) + 1
        
        return {
            "type": "correlation",
            "severity_distribution": severity_correlation,
            "insight": "告警严重程度分布分析"
        }
    
    def _generate_insights(self, patterns: List[Dict]) -> List[str]:
        """生成洞察"""
        insights = []
        
        for pattern in patterns:
            if pattern.get("insight"):
                insights.append(pattern["insight"])
        
        return insights
    
    def _generate_recommendations(self, patterns: List[Dict]) -> List[str]:
        """生成建议"""
        recommendations = []
        
        for pattern in patterns:
            if pattern.get("type") == "frequency_pattern":
                top_rules = pattern.get("top_rules", [])
                if top_rules:
                    recommendations.append(f"优化规则 {top_rules[0][0]} 以减少告警频率")
            
            elif pattern.get("type") == "time_pattern":
                peak_hours = pattern.get("peak_hours", [])
                if peak_hours:
                    recommendations.append(f"在高峰时段 {peak_hours[0][0]}点 增加监控")
        
        return recommendations
    
    def calculate_alert_quality(self, alerts: List[Dict]) -> Dict[str, float]:
        """计算告警质量"""
        if not alerts:
            return {"accuracy": 0, "precision": 0, "recall": 0}
        
        # 简化实现
        total_alerts = len(alerts)
        resolved_alerts = sum(1 for a in alerts if a.get("status") == "resolved")
        acknowledged_alerts = sum(1 for a in alerts if a.get("status") == "acknowledged")
        
        accuracy = resolved_alerts / total_alerts if total_alerts > 0 else 0
        precision = acknowledged_alerts / total_alerts if total_alerts > 0 else 0
        recall = resolved_alerts / (resolved_alerts + 10)  # 假设有10个未检测到的问题
        
        return {
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1_score": 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        }

总结

LLM告警系统是确保模型稳定运行的关键组件。通过设计合理的告警规则、实现智能路由和自动化响应,组织可以及时发现和处理问题。结合告警分析和优化,可以持续改进告警质量,减少误报和漏报,提高运维效率。