LLM告警
--- title: "LLM告警" description: "全面介绍LLM告警系统设计,包括告警规则、告警路由、告警响应等核心功能实现" tags: ["LLM告警", "告警规则", "告警路由", "告警响应"] category: "llm" icon: "🧠"
LLM告警
LLM告警系统概述
LLM告警系统是监控大语言模型运行状态、性能指标和异常情况的关键组件。当模型出现性能下降、资源异常或安全问题时,系统能够及时通知相关人员采取行动。
告警系统架构
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable
from enum import Enum
import json
import threading
class AlertSeverity(Enum):
"""告警严重程度"""
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
EMERGENCY = "emergency"
class AlertStatus(Enum):
"""告警状态"""
PENDING = "pending"
FIRING = "firing"
RESOLVED = "resolved"
ACKNOWLEDGED = "acknowledged"
SILENCED = "silenced"
class LLMAlertingSystem:
"""LLM告警系统"""
def __init__(self):
self.alert_rules = []
self.active_alerts = []
self.alert_history = []
self.notification_channels = []
self.escalation_policies = []
self.silence_rules = []
self.lock = threading.Lock()
def add_alert_rule(self, rule_config: Dict[str, Any]) -> str:
"""添加告警规则"""
rule_id = f"rule_{len(self.alert_rules) + 1}"
rule = {
"id": rule_id,
"name": rule_config.get("name", ""),
"description": rule_config.get("description", ""),
"metric": rule_config.get("metric", ""),
"condition": rule_config.get("condition", {}),
"severity": rule_config.get("severity", AlertSeverity.WARNING),
"duration": rule_config.get("duration", "5m"),
"labels": rule_config.get("labels", {}),
"annotations": rule_config.get("annotations", {}),
"enabled": rule_config.get("enabled", True),
"created_at": datetime.now().isoformat()
}
self.alert_rules.append(rule)
return rule_id
def evaluate_rules(self, metrics: Dict[str, Any]) -> List[Dict]:
"""评估告警规则"""
new_alerts = []
for rule in self.alert_rules:
if not rule.get("enabled", True):
continue
if self._evaluate_condition(rule, metrics):
alert = self._create_alert(rule, metrics)
if not self._is_silenced(alert):
new_alerts.append(alert)
self._process_alert(alert)
return new_alerts
def _evaluate_condition(self, rule: Dict, metrics: Dict) -> bool:
"""评估条件"""
condition = rule.get("condition", {})
metric_name = rule.get("metric", "")
if metric_name not in metrics:
return False
metric_value = metrics[metric_name]
operator = condition.get("operator", "gt")
threshold = condition.get("threshold", 0)
if operator == "gt":
return metric_value > threshold
elif operator == "lt":
return metric_value < threshold
elif operator == "gte":
return metric_value >= threshold
elif operator == "lte":
return metric_value <= threshold
elif operator == "eq":
return metric_value == threshold
elif operator == "ne":
return metric_value != threshold
return False
def _create_alert(self, rule: Dict, metrics: Dict) -> Dict:
"""创建告警"""
alert_id = f"alert_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{rule['id']}"
alert = {
"id": alert_id,
"rule_id": rule["id"],
"rule_name": rule["name"],
"severity": rule["severity"],
"status": AlertStatus.FIRING,
"labels": rule.get("labels", {}),
"annotations": rule.get("annotations", {}),
"metric_name": rule["metric"],
"metric_value": metrics.get(rule["metric"], 0),
"condition": rule["condition"],
"fired_at": datetime.now().isoformat(),
"resolved_at": None,
"acknowledged_at": None,
"acknowledged_by": None,
"description": rule.get("description", "")
}
return alert
def _is_silenced(self, alert: Dict) -> bool:
"""检查是否被静默"""
for silence_rule in self.silence_rules:
if self._matches_silence(alert, silence_rule):
return True
return False
def _matches_silence(self, alert: Dict, silence_rule: Dict) -> bool:
"""检查是否匹配静默规则"""
# 检查标签匹配
matchers = silence_rule.get("matchers", {})
for key, value in matchers.items():
if alert.get("labels", {}).get(key) != value:
return False
# 检查时间范围
start = silence_rule.get("start", datetime.min)
end = silence_rule.get("end", datetime.max)
now = datetime.now()
if start <= now <= end:
return True
return False
def _process_alert(self, alert: Dict):
"""处理告警"""
with self.lock:
self.active_alerts.append(alert)
self.alert_history.append(alert)
# 发送通知
self._send_notifications(alert)
# 执行升级策略
self._execute_escalation(alert)
def _send_notifications(self, alert: Dict):
"""发送通知"""
for channel in self.notification_channels:
try:
channel.send(alert)
except Exception as e:
print(f"Failed to send notification to {channel.name}: {e}")
def _execute_escalation(self, alert: Dict):
"""执行升级策略"""
for policy in self.escalation_policies:
if self._matches_escalation(alert, policy):
policy.execute(alert)
break
def _matches_escalation(self, alert: Dict, policy: Dict) -> bool:
"""检查是否匹配升级策略"""
severity_match = alert["severity"] in policy.get("severities", [])
label_match = self._match_labels(alert.get("labels", {}), policy.get("labels", {}))
return severity_match and label_match
def resolve_alert(self, alert_id: str) -> bool:
"""解决告警"""
with self.lock:
for alert in self.active_alerts:
if alert["id"] == alert_id:
alert["status"] = AlertStatus.RESOLVED
alert["resolved_at"] = datetime.now().isoformat()
self.active_alerts.remove(alert)
return True
return False
def acknowledge_alert(self, alert_id: str, user: str) -> bool:
"""确认告警"""
with self.lock:
for alert in self.active_alerts:
if alert["id"] == alert_id:
alert["status"] = AlertStatus.ACKNOWLEDGED
alert["acknowledged_at"] = datetime.now().isoformat()
alert["acknowledged_by"] = user
return True
return False
def silence_alert(self, alert_id: str, duration_minutes: int = 60) -> bool:
"""静默告警"""
with self.lock:
for alert in self.active_alerts:
if alert["id"] == alert_id:
alert["status"] = AlertStatus.SILENCED
return True
return False
def get_active_alerts(self, severity: Optional[AlertSeverity] = None) -> List[Dict]:
"""获取活跃告警"""
if severity:
return [a for a in self.active_alerts if a["severity"] == severity]
return self.active_alerts.copy()
def get_alert_history(self, hours: int = 24) -> List[Dict]:
"""获取告警历史"""
cutoff_time = datetime.now() - timedelta(hours=hours)
return [
a for a in self.alert_history
if datetime.fromisoformat(a["fired_at"]) > cutoff_time
]
def add_notification_channel(self, channel):
"""添加通知渠道"""
self.notification_channels.append(channel)
def add_escalation_policy(self, policy: Dict):
"""添加升级策略"""
self.escalation_policies.append(policy)
def add_silence_rule(self, rule: Dict):
"""添加静默规则"""
self.silence_rules.append(rule)
告警规则定义
规则配置器
class AlertRuleConfigurator:
"""告警规则配置器"""
def __init__(self):
self.rule_templates = self._load_rule_templates()
def _load_rule_templates(self) -> Dict[str, Dict]:
"""加载规则模板"""
return {
"high_latency": {
"name": "高延迟告警",
"description": "模型响应时间超过阈值",
"metric": "latency",
"condition": {"operator": "gt", "threshold": 1.0},
"severity": AlertSeverity.WARNING,
"duration": "5m",
"labels": {"team": "ml-ops", "service": "llm"}
},
"high_error_rate": {
"name": "高错误率告警",
"description": "模型错误率超过阈值",
"metric": "error_rate",
"condition": {"operator": "gt", "threshold": 0.05},
"severity": AlertSeverity.CRITICAL,
"duration": "2m",
"labels": {"team": "ml-ops", "service": "llm"}
},
"low_accuracy": {
"name": "低准确率告警",
"description": "模型准确率低于阈值",
"metric": "accuracy",
"condition": {"operator": "lt", "threshold": 0.9},
"severity": AlertSeverity.WARNING,
"duration": "10m",
"labels": {"team": "ml-ops", "service": "llm"}
},
"high_memory_usage": {
"name": "高内存使用告警",
"description": "GPU内存使用率超过阈值",
"metric": "gpu_memory_usage",
"condition": {"operator": "gt", "threshold": 0.9},
"severity": AlertSeverity.CRITICAL,
"duration": "1m",
"labels": {"team": "infra", "service": "gpu"}
},
"security_incident": {
"name": "安全事件告警",
"description": "检测到潜在安全事件",
"metric": "security_score",
"condition": {"operator": "lt", "threshold": 0.7},
"severity": AlertSeverity.EMERGENCY,
"duration": "0m",
"labels": {"team": "security", "service": "llm"}
}
}
def create_rule_from_template(self, template_name: str,
overrides: Dict = None) -> Dict:
"""从模板创建规则"""
if template_name not in self.rule_templates:
raise ValueError(f"Template {template_name} not found")
rule = self.rule_templates[template_name].copy()
if overrides:
rule.update(overrides)
return rule
def create_custom_rule(self, config: Dict) -> Dict:
"""创建自定义规则"""
required_fields = ["name", "metric", "condition"]
for field in required_fields:
if field not in config:
raise ValueError(f"Missing required field: {field}")
rule = {
"name": config["name"],
"description": config.get("description", ""),
"metric": config["metric"],
"condition": config["condition"],
"severity": config.get("severity", AlertSeverity.WARNING),
"duration": config.get("duration", "5m"),
"labels": config.get("labels", {}),
"annotations": config.get("annotations", {}),
"enabled": config.get("enabled", True)
}
return rule
def validate_rule(self, rule: Dict) -> Dict[str, Any]:
"""验证规则"""
errors = []
# 验证必填字段
if not rule.get("name"):
errors.append("Rule name is required")
if not rule.get("metric"):
errors.append("Metric name is required")
if not rule.get("condition"):
errors.append("Condition is required")
# 验证条件格式
condition = rule.get("condition", {})
if "operator" not in condition:
errors.append("Condition operator is required")
if "threshold" not in condition:
errors.append("Condition threshold is required")
# 验证严重程度
severity = rule.get("severity", "")
if severity not in [s.value for s in AlertSeverity]:
errors.append(f"Invalid severity: {severity}")
return {
"valid": len(errors) == 0,
"errors": errors
}
def optimize_rule(self, rule: Dict) -> Dict:
"""优化规则"""
optimized = rule.copy()
# 根据指标类型优化阈值
metric_name = rule.get("metric", "")
condition = rule.get("condition", {})
if metric_name == "latency":
# 延迟告警:使用更宽松的阈值
optimized["condition"]["threshold"] = condition.get("threshold", 1.0) * 1.2
optimized["duration"] = "10m"
elif metric_name == "error_rate":
# 错误率告警:使用更严格的阈值
optimized["condition"]["threshold"] = condition.get("threshold", 0.05) * 0.8
elif metric_name == "accuracy":
# 准确率告警:使用更严格的阈值
optimized["condition"]["threshold"] = condition.get("threshold", 0.9) * 1.1
return optimized
告警路由
路由管理器
class AlertRouter:
"""告警路由器"""
def __init__(self):
self.routing_rules = []
self.route_cache = {}
def add_routing_rule(self, rule_config: Dict) -> str:
"""添加路由规则"""
rule_id = f"route_{len(self.routing_rules) + 1}"
rule = {
"id": rule_id,
"name": rule_config.get("name", ""),
"matchers": rule_config.get("matchers", {}),
"routes": rule_config.get("routes", []),
"priority": rule_config.get("priority", 0),
"enabled": rule_config.get("enabled", True)
}
self.routing_rules.append(rule)
# 按优先级排序
self.routing_rules.sort(key=lambda x: x.get("priority", 0), reverse=True)
return rule_id
def route_alert(self, alert: Dict) -> List[Dict]:
"""路由告警"""
routes = []
for rule in self.routing_rules:
if not rule.get("enabled", True):
continue
if self._matches_rule(alert, rule):
routes.extend(rule.get("routes", []))
# 如果是终止规则,停止匹配
if rule.get("continue", True) is False:
break
# 去重
unique_routes = []
seen = set()
for route in routes:
route_key = (route.get("channel", ""), str(route.get("recipients", [])))
if route_key not in seen:
unique_routes.append(route)
seen.add(route_key)
return unique_routes
def _matches_rule(self, alert: Dict, rule: Dict) -> bool:
"""检查是否匹配规则"""
matchers = rule.get("matchers", {})
for key, value in matchers.items():
# 支持正则表达式
if isinstance(value, str) and value.startswith("regex:"):
pattern = value[6:]
import re
if not re.match(pattern, alert.get("labels", {}).get(key, "")):
return False
else:
# 精确匹配
if alert.get("labels", {}).get(key) != value:
return False
return True
def get_routes_for_alert(self, alert: Dict) -> List[Dict]:
"""获取告警的路由"""
return self.route_alert(alert)
def update_routing_rule(self, rule_id: str, updates: Dict) -> bool:
"""更新路由规则"""
for rule in self.routing_rules:
if rule["id"] == rule_id:
rule.update(updates)
return True
return False
def remove_routing_rule(self, rule_id: str) -> bool:
"""移除路由规则"""
original_count = len(self.routing_rules)
self.routing_rules = [r for r in self.routing_rules if r["id"] != rule_id]
return len(self.routing_rules) < original_count
def test_routing(self, alert: Dict) -> Dict[str, Any]:
"""测试路由"""
routes = self.route_alert(alert)
return {
"alert": alert,
"matched_routes": routes,
"route_count": len(routes),
"channels": [r.get("channel") for r in routes]
}
class NotificationChannel:
"""通知渠道"""
def __init__(self, channel_type: str, name: str, config: Dict):
self.channel_type = channel_type
self.name = name
self.config = config
self.enabled = True
def send(self, alert: Dict) -> bool:
"""发送通知"""
if not self.enabled:
return False
try:
if self.channel_type == "email":
return self._send_email(alert)
elif self.channel_type == "slack":
return self._send_slack(alert)
elif self.channel_type == "webhook":
return self._send_webhook(alert)
elif self.channel_type == "sms":
return self._send_sms(alert)
else:
return False
except Exception as e:
print(f"Failed to send notification: {e}")
return False
def _send_email(self, alert: Dict) -> bool:
"""发送邮件"""
# 实现邮件发送逻辑
print(f"Sending email alert: {alert.get('rule_name', '')}")
return True
def _send_slack(self, alert: Dict) -> bool:
"""发送Slack消息"""
# 实现Slack发送逻辑
print(f"Sending Slack alert: {alert.get('rule_name', '')}")
return True
def _send_webhook(self, alert: Dict) -> bool:
"""发送Webhook"""
import requests
url = self.config.get("url", "")
payload = {
"alert": alert,
"timestamp": datetime.now().isoformat()
}
response = requests.post(url, json=payload, timeout=10)
return response.status_code == 200
def _send_sms(self, alert: Dict) -> bool:
"""发送短信"""
# 实现短信发送逻辑
print(f"Sending SMS alert: {alert.get('rule_name', '')}")
return True
告警响应
响应管理器
class AlertResponseManager:
"""告警响应管理器"""
def __init__(self):
self.response_playbooks = []
self.automation_rules = []
self.escalation_policies = []
def create_response_playbook(self, playbook_config: Dict) -> str:
"""创建响应手册"""
playbook_id = f"playbook_{len(self.response_playbooks) + 1}"
playbook = {
"id": playbook_id,
"name": playbook_config.get("name", ""),
"description": playbook_config.get("description", ""),
"trigger_conditions": playbook_config.get("trigger_conditions", {}),
"steps": playbook_config.get("steps", []),
"estimated_time": playbook_config.get("estimated_time", "30m"),
"required_roles": playbook_config.get("required_roles", []),
"created_at": datetime.now().isoformat()
}
self.response_playbooks.append(playbook)
return playbook_id
def execute_playbook(self, playbook_id: str, alert: Dict) -> Dict[str, Any]:
"""执行响应手册"""
playbook = self._get_playbook(playbook_id)
if not playbook:
return {"success": False, "error": "Playbook not found"}
execution = {
"execution_id": f"exec_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
"playbook_id": playbook_id,
"alert_id": alert.get("id", ""),
"started_at": datetime.now().isoformat(),
"completed_at": None,
"status": "running",
"steps_completed": [],
"steps_failed": []
}
for step in playbook.get("steps", []):
try:
result = self._execute_step(step, alert)
execution["steps_completed"].append({
"step": step,
"result": result,
"completed_at": datetime.now().isoformat()
})
except Exception as e:
execution["steps_failed"].append({
"step": step,
"error": str(e),
"failed_at": datetime.now().isoformat()
})
execution["status"] = "failed"
break
if execution["status"] != "failed":
execution["status"] = "completed"
execution["completed_at"] = datetime.now().isoformat()
return execution
def _get_playbook(self, playbook_id: str) -> Optional[Dict]:
"""获取手册"""
for playbook in self.response_playbooks:
if playbook["id"] == playbook_id:
return playbook
return None
def _execute_step(self, step: Dict, alert: Dict) -> Any:
"""执行步骤"""
step_type = step.get("type", "")
if step_type == "notify":
return self._execute_notify_step(step, alert)
elif step_type == "investigate":
return self._execute_investigate_step(step, alert)
elif step_type == "remediate":
return self._execute_remediate_step(step, alert)
elif step_type == "verify":
return self._execute_verify_step(step, alert)
else:
return {"status": "skipped", "reason": f"Unknown step type: {step_type}"}
def _execute_notify_step(self, step: Dict, alert: Dict) -> Dict:
"""执行通知步骤"""
recipients = step.get("recipients", [])
message = step.get("message", f"Alert: {alert.get('rule_name', '')}")
# 发送通知
return {
"status": "success",
"recipients_notified": len(recipients),
"message": message
}
def _execute_investigate_step(self, step: Dict, alert: Dict) -> Dict:
"""执行调查步骤"""
investigation_type = step.get("investigation_type", "basic")
# 执行调查
return {
"status": "success",
"investigation_type": investigation_type,
"findings": []
}
def _execute_remediate_step(self, step: Dict, alert: Dict) -> Dict:
"""执行修复步骤"""
remediation_action = step.get("action", "")
# 执行修复
return {
"status": "success",
"action": remediation_action,
"result": "Remediation applied successfully"
}
def _execute_verify_step(self, step: Dict, alert: Dict) -> Dict:
"""执行验证步骤"""
verification_method = step.get("method", "")
# 执行验证
return {
"status": "success",
"method": verification_method,
"verified": True
}
def add_escalation_policy(self, policy_config: Dict) -> str:
"""添加升级策略"""
policy_id = f"escalation_{len(self.escalation_policies) + 1}"
policy = {
"id": policy_id,
"name": policy_config.get("name", ""),
"severities": policy_config.get("severities", []),
"stages": policy_config.get("stages", []),
"enabled": policy_config.get("enabled", True)
}
self.escalation_policies.append(policy)
return policy_id
def execute_escalation(self, alert: Dict) -> Dict[str, Any]:
"""执行升级"""
for policy in self.escalation_policies:
if not policy.get("enabled", True):
continue
if alert["severity"] in policy.get("severities", []):
return self._run_escalation_stages(alert, policy["stages"])
return {"status": "no_matching_policy"}
def _run_escalation_stages(self, alert: Dict, stages: List[Dict]) -> Dict[str, Any]:
"""运行升级阶段"""
execution = {
"stages_executed": [],
"current_stage": 0,
"escalated": False
}
for i, stage in enumerate(stages):
wait_time = stage.get("wait_time", "5m")
recipients = stage.get("recipients", [])
# 等待时间
# 这里简化实现,实际应该异步等待
# 执行通知
stage_result = {
"stage": i + 1,
"wait_time": wait_time,
"recipients": recipients,
"executed_at": datetime.now().isoformat()
}
execution["stages_executed"].append(stage_result)
execution["current_stage"] = i + 1
# 如果这是最后一个阶段,标记为已升级
if i == len(stages) - 1:
execution["escalated"] = True
return execution
def auto_remediate(self, alert: Dict) -> Dict[str, Any]:
"""自动修复"""
remediation_rules = self._get_remediation_rules(alert)
results = []
for rule in remediation_rules:
try:
result = self._apply_remediation(rule, alert)
results.append(result)
except Exception as e:
results.append({
"rule": rule.get("name", ""),
"status": "failed",
"error": str(e)
})
return {
"alert_id": alert.get("id", ""),
"remediation_results": results,
"timestamp": datetime.now().isoformat()
}
def _get_remediation_rules(self, alert: Dict) -> List[Dict]:
"""获取修复规则"""
# 根据告警类型返回相应规则
return []
def _apply_remediation(self, rule: Dict, alert: Dict) -> Dict:
"""应用修复"""
return {
"rule": rule.get("name", ""),
"status": "success",
"applied_at": datetime.now().isoformat()
}
告警分析与优化
告警分析器
class AlertAnalyzer:
"""告警分析器"""
def __init__(self):
self.alert_metrics = {}
def analyze_alert_patterns(self, alerts: List[Dict]) -> Dict[str, Any]:
"""分析告警模式"""
if not alerts:
return {"patterns": [], "insights": []}
patterns = []
# 时间模式分析
time_pattern = self._analyze_time_pattern(alerts)
patterns.append(time_pattern)
# 频率模式分析
frequency_pattern = self._analyze_frequency_pattern(alerts)
patterns.append(frequency_pattern)
# 关联性分析
correlation_pattern = self._analyze_correlation(alerts)
patterns.append(correlation_pattern)
return {
"patterns": patterns,
"insights": self._generate_insights(patterns),
"recommendations": self._generate_recommendations(patterns)
}
def _analyze_time_pattern(self, alerts: List[Dict]) -> Dict:
"""分析时间模式"""
hourly_distribution = {}
for alert in alerts:
fired_at = datetime.fromisoformat(alert.get("fired_at", ""))
hour = fired_at.hour
hourly_distribution[hour] = hourly_distribution.get(hour, 0) + 1
# 找出高峰时段
peak_hours = sorted(hourly_distribution.items(), key=lambda x: x[1], reverse=True)[:3]
return {
"type": "time_pattern",
"hourly_distribution": hourly_distribution,
"peak_hours": peak_hours,
"insight": f"高峰时段: {', '.join([f'{h}点' for h, _ in peak_hours])}"
}
def _analyze_frequency_pattern(self, alerts: List[Dict]) -> Dict:
"""分析频率模式"""
rule_frequencies = {}
for alert in alerts:
rule_id = alert.get("rule_id", "")
rule_frequencies[rule_id] = rule_frequencies.get(rule_id, 0) + 1
# 找出最频繁的规则
top_rules = sorted(rule_frequencies.items(), key=lambda x: x[1], reverse=True)[:5]
return {
"type": "frequency_pattern",
"rule_frequencies": rule_frequencies,
"top_rules": top_rules,
"insight": f"最频繁告警规则: {top_rules[0][0] if top_rules else '无'}"
}
def _analyze_correlation(self, alerts: List[Dict]) -> Dict:
"""分析相关性"""
# 简化的相关性分析
severity_correlation = {}
for alert in alerts:
severity = alert.get("severity", "")
severity_correlation[severity] = severity_correlation.get(severity, 0) + 1
return {
"type": "correlation",
"severity_distribution": severity_correlation,
"insight": "告警严重程度分布分析"
}
def _generate_insights(self, patterns: List[Dict]) -> List[str]:
"""生成洞察"""
insights = []
for pattern in patterns:
if pattern.get("insight"):
insights.append(pattern["insight"])
return insights
def _generate_recommendations(self, patterns: List[Dict]) -> List[str]:
"""生成建议"""
recommendations = []
for pattern in patterns:
if pattern.get("type") == "frequency_pattern":
top_rules = pattern.get("top_rules", [])
if top_rules:
recommendations.append(f"优化规则 {top_rules[0][0]} 以减少告警频率")
elif pattern.get("type") == "time_pattern":
peak_hours = pattern.get("peak_hours", [])
if peak_hours:
recommendations.append(f"在高峰时段 {peak_hours[0][0]}点 增加监控")
return recommendations
def calculate_alert_quality(self, alerts: List[Dict]) -> Dict[str, float]:
"""计算告警质量"""
if not alerts:
return {"accuracy": 0, "precision": 0, "recall": 0}
# 简化实现
total_alerts = len(alerts)
resolved_alerts = sum(1 for a in alerts if a.get("status") == "resolved")
acknowledged_alerts = sum(1 for a in alerts if a.get("status") == "acknowledged")
accuracy = resolved_alerts / total_alerts if total_alerts > 0 else 0
precision = acknowledged_alerts / total_alerts if total_alerts > 0 else 0
recall = resolved_alerts / (resolved_alerts + 10) # 假设有10个未检测到的问题
return {
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"f1_score": 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
}
总结
LLM告警系统是确保模型稳定运行的关键组件。通过设计合理的告警规则、实现智能路由和自动化响应,组织可以及时发现和处理问题。结合告警分析和优化,可以持续改进告警质量,减少误报和漏报,提高运维效率。