LLM仪表板
--- title: "LLM仪表板" description: "详细介绍LLM监控仪表板设计,包括监控面板、趋势分析、自定义看板等核心功能" tags: ["LLM仪表板", "监控面板", "趋势分析", "自定义看板"] category: "llm" icon: "🧠"
LLM仪表板
LLM仪表板概述
LLM仪表板是实时监控和分析大语言模型性能、使用情况和业务影响的可视化界面。一个优秀的仪表板可以帮助运维团队快速发现问题、优化性能并做出数据驱动的决策。
仪表板架构设计
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import json
class LLMDashboard:
"""LLM仪表板"""
def __init__(self, dashboard_id: str, name: str):
self.dashboard_id = dashboard_id
self.name = name
self.widgets = []
self.refresh_interval = 30 # 秒
self.data_sources = {}
self.alert_rules = []
self.user_preferences = {}
def add_widget(self, widget_config: Dict[str, Any]) -> str:
"""添加组件"""
widget_id = f"widget_{len(self.widgets) + 1}"
widget = {
"id": widget_id,
"type": widget_config.get("type", "metric_card"),
"title": widget_config.get("title", ""),
"position": widget_config.get("position", {"x": 0, "y": 0, "w": 1, "h": 1}),
"data_source": widget_config.get("data_source", ""),
"refresh_rate": widget_config.get("refresh_rate", 60),
"visualization": widget_config.get("visualization", {}),
"filters": widget_config.get("filters", []),
"created_at": datetime.now().isoformat()
}
self.widgets.append(widget)
return widget_id
def update_widget(self, widget_id: str, updates: Dict[str, Any]) -> bool:
"""更新组件"""
for widget in self.widgets:
if widget["id"] == widget_id:
widget.update(updates)
widget["updated_at"] = datetime.now().isoformat()
return True
return False
def remove_widget(self, widget_id: str) -> bool:
"""移除组件"""
original_count = len(self.widgets)
self.widgets = [w for w in self.widgets if w["id"] != widget_id]
return len(self.widgets) < original_count
def get_dashboard_data(self) -> Dict[str, Any]:
"""获取仪表板数据"""
dashboard_data = {
"dashboard_id": self.dashboard_id,
"name": self.name,
"widgets": [],
"last_updated": datetime.now().isoformat(),
"status": "active"
}
for widget in self.widgets:
widget_data = self._fetch_widget_data(widget)
dashboard_data["widgets"].append({
"widget": widget,
"data": widget_data
})
return dashboard_data
def _fetch_widget_data(self, widget: Dict) -> Any:
"""获取组件数据"""
data_source = widget.get("data_source", "")
if data_source in self.data_sources:
return self.data_sources[data_source]()
return {"value": 0, "trend": "stable"}
def set_data_source(self, name: str, fetcher):
"""设置数据源"""
self.data_sources[name] = fetcher
def add_alert_rule(self, rule_config: Dict) -> str:
"""添加告警规则"""
rule_id = f"rule_{len(self.alert_rules) + 1}"
rule = {
"id": rule_id,
"name": rule_config.get("name", ""),
"condition": rule_config.get("condition", {}),
"action": rule_config.get("action", {}),
"enabled": rule_config.get("enabled", True),
"created_at": datetime.now().isoformat()
}
self.alert_rules.append(rule)
return rule_id
def export_dashboard(self, format: str = "json") -> bytes:
"""导出仪表板"""
dashboard_config = {
"dashboard_id": self.dashboard_id,
"name": self.name,
"widgets": self.widgets,
"alert_rules": self.alert_rules,
"export_time": datetime.now().isoformat()
}
if format == "json":
return json.dumps(dashboard_config, indent=2).encode()
elif format == "html":
return self._generate_html()
else:
return json.dumps(dashboard_config).encode()
def _generate_html(self) -> bytes:
"""生成HTML"""
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>{self.name}</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 0; padding: 20px; background: #f5f5f5; }}
.dashboard {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; }}
.widget {{ background: white; border-radius: 8px; padding: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
.widget-title {{ font-size: 14px; color: #666; margin-bottom: 10px; }}
.widget-value {{ font-size: 24px; font-weight: bold; color: #333; }}
.trend-up {{ color: #4CAF50; }}
.trend-down {{ color: #f44336; }}
</style>
</head>
<body>
<h1>{self.name}</h1>
<div class="dashboard">
"""
for widget in self.widgets:
html += f"""
<div class="widget">
<div class="widget-title">{widget.get('title', '')}</div>
<div class="widget-value">--</div>
</div>
"""
html += """
</div>
</body>
</html>
"""
return html.encode()
监控面板设计
实时监控组件
class RealTimeMonitor:
"""实时监控器"""
def __init__(self):
self.metrics_buffer = []
self.update_callbacks = []
self.is_running = False
def start_monitoring(self):
"""开始监控"""
self.is_running = True
self._monitoring_loop()
def stop_monitoring(self):
"""停止监控"""
self.is_running = False
def _monitoring_loop(self):
"""监控循环"""
while self.is_running:
# 收集实时指标
metrics = self._collect_real_time_metrics()
self.metrics_buffer.append(metrics)
# 保持缓冲区大小
if len(self.metrics_buffer) > 1000:
self.metrics_buffer = self.metrics_buffer[-1000:]
# 触发更新回调
for callback in self.update_callbacks:
callback(metrics)
# 等待下一个更新周期
import time
time.sleep(1)
def _collect_real_time_metrics(self) -> Dict[str, Any]:
"""收集实时指标"""
return {
"timestamp": datetime.now().isoformat(),
"requests_per_second": self._get_rps(),
"average_latency": self._get_avg_latency(),
"error_rate": self._get_error_rate(),
"active_users": self._get_active_users(),
"model_load": self._get_model_load()
}
def _get_rps(self) -> float:
"""获取每秒请求数"""
# 简化实现
import random
return random.uniform(100, 500)
def _get_avg_latency(self) -> float:
"""获取平均延迟"""
import random
return random.uniform(0.2, 1.0)
def _get_error_rate(self) -> float:
"""获取错误率"""
import random
return random.uniform(0, 0.05)
def _get_active_users(self) -> int:
"""获取活跃用户数"""
import random
return random.randint(50, 200)
def _get_model_load(self) -> float:
"""获取模型负载"""
import random
return random.uniform(0.3, 0.8)
def add_update_callback(self, callback):
"""添加更新回调"""
self.update_callbacks.append(callback)
def get_historical_metrics(self, duration_minutes: int = 60) -> List[Dict]:
"""获取历史指标"""
cutoff_time = datetime.now() - timedelta(minutes=duration_minutes)
return [
m for m in self.metrics_buffer
if datetime.fromisoformat(m["timestamp"]) > cutoff_time
]
def calculate_statistics(self, metrics: List[Dict]) -> Dict[str, Any]:
"""计算统计信息"""
if not metrics:
return {}
rps_values = [m["requests_per_second"] for m in metrics]
latency_values = [m["average_latency"] for m in metrics]
error_rates = [m["error_rate"] for m in metrics]
return {
"rps": {
"average": sum(rps_values) / len(rps_values),
"min": min(rps_values),
"max": max(rps_values),
"current": rps_values[-1] if rps_values else 0
},
"latency": {
"average": sum(latency_values) / len(latency_values),
"min": min(latency_values),
"max": max(latency_values),
"p95": self._calculate_percentile(latency_values, 95)
},
"error_rate": {
"average": sum(error_rates) / len(error_rates),
"max": max(error_rates),
"trend": self._calculate_trend(error_rates)
}
}
def _calculate_percentile(self, values: List[float], percentile: float) -> float:
"""计算百分位数"""
sorted_values = sorted(values)
index = int(len(sorted_values) * percentile / 100)
return sorted_values[min(index, len(sorted_values) - 1)]
def _calculate_trend(self, values: List[float]) -> str:
"""计算趋势"""
if len(values) < 2:
return "stable"
recent_avg = sum(values[-10:]) / min(10, len(values[-10:]))
earlier_avg = sum(values[:10]) / min(10, len(values[:10]))
if recent_avg > earlier_avg * 1.1:
return "increasing"
elif recent_avg < earlier_avg * 0.9:
return "decreasing"
else:
return "stable"
组件类型定义
class DashboardWidgetTypes:
"""仪表板组件类型"""
@staticmethod
def metric_card(title: str, value: Any, unit: str = "",
trend: str = "stable") -> Dict:
"""指标卡片"""
return {
"type": "metric_card",
"title": title,
"value": value,
"unit": unit,
"trend": trend,
"style": {
"background": "#ffffff",
"border": "1px solid #e0e0e0",
"borderRadius": "8px",
"padding": "20px"
}
}
@staticmethod
def line_chart(title: str, data: List[Dict],
x_axis: str = "time", y_axis: str = "value") -> Dict:
"""折线图"""
return {
"type": "line_chart",
"title": title,
"data": data,
"x_axis": x_axis,
"y_axis": y_axis,
"options": {
"grid": True,
"legend": True,
"tooltip": True,
"animation": True
}
}
@staticmethod
def bar_chart(title: str, categories: List[str],
values: List[float], colors: List[str] = None) -> Dict:
"""柱状图"""
return {
"type": "bar_chart",
"title": title,
"categories": categories,
"values": values,
"colors": colors or ["#4CAF50", "#2196F3", "#FF9800", "#9C27B0"],
"options": {
"horizontal": False,
"stacked": False,
"showValues": True
}
}
@staticmethod
def pie_chart(title: str, segments: List[Dict]) -> Dict:
"""饼图"""
return {
"type": "pie_chart",
"title": title,
"segments": segments,
"options": {
"donut": False,
"showLabels": True,
"showPercentages": True
}
}
@staticmethod
def gauge_chart(title: str, value: float, min_val: float = 0,
max_val: float = 100) -> Dict:
"""仪表盘图"""
return {
"type": "gauge_chart",
"title": title,
"value": value,
"min": min_val,
"max": max_val,
"thresholds": [
{"value": max_val * 0.3, "color": "#4CAF50"},
{"value": max_val * 0.7, "color": "#FF9800"},
{"value": max_val * 0.9, "color": "#f44336"}
]
}
@staticmethod
def heatmap(title: str, data: List[List[float]],
x_labels: List[str], y_labels: List[str]) -> Dict:
"""热力图"""
return {
"type": "heatmap",
"title": title,
"data": data,
"x_labels": x_labels,
"y_labels": y_labels,
"color_scale": ["#ffffff", "#4CAF50"],
"options": {
"showValues": True,
"interactive": True
}
}
@staticmethod
def table(title: str, columns: List[Dict], rows: List[List]) -> Dict:
"""表格"""
return {
"type": "table",
"title": title,
"columns": columns,
"rows": rows,
"options": {
"sortable": True,
"filterable": True,
"pagination": True,
"pageSize": 10
}
}
@staticmethod
def alert_panel(title: str, alerts: List[Dict]) -> Dict:
"""告警面板"""
return {
"type": "alert_panel",
"title": title,
"alerts": alerts,
"options": {
"maxAlerts": 10,
"showTimestamp": True,
"autoRefresh": True
}
}
趋势分析
趋势分析器
class TrendAnalyzer:
"""趋势分析器"""
def __init__(self):
self.trend_data = {}
self.analysis_methods = {
"linear": self._linear_trend,
"moving_average": self._moving_average_trend,
"exponential": self._exponential_trend
}
def analyze_trend(self, metric_name: str, data_points: List[Dict],
method: str = "linear") -> Dict[str, Any]:
"""分析趋势"""
if method not in self.analysis_methods:
raise ValueError(f"Unknown method: {method}")
trend_result = self.analysis_methods[method](data_points)
# 存储趋势数据
self.trend_data[metric_name] = {
"trend": trend_result,
"analyzed_at": datetime.now().isoformat(),
"data_points_count": len(data_points)
}
return trend_result
def _linear_trend(self, data_points: List[Dict]) -> Dict[str, Any]:
"""线性趋势分析"""
if len(data_points) < 2:
return {"slope": 0, "intercept": 0, "r_squared": 0, "direction": "stable"}
x_values = list(range(len(data_points)))
y_values = [dp.get("value", 0) for dp in data_points]
# 简单线性回归
n = len(x_values)
sum_x = sum(x_values)
sum_y = sum(y_values)
sum_xy = sum(x * y for x, y in zip(x_values, y_values))
sum_x2 = sum(x * x for x in x_values)
denominator = n * sum_x2 - sum_x * sum_x
if denominator == 0:
return {"slope": 0, "intercept": sum_y / n, "r_squared": 0, "direction": "stable"}
slope = (n * sum_xy - sum_x * sum_y) / denominator
intercept = (sum_y - slope * sum_x) / n
# 计算R²
y_mean = sum_y / n
ss_total = sum((y - y_mean) ** 2 for y in y_values)
ss_residual = sum((y - (slope * x + intercept)) ** 2 for x, y in zip(x_values, y_values))
r_squared = 1 - (ss_residual / ss_total) if ss_total > 0 else 0
# 确定趋势方向
if slope > 0.01:
direction = "increasing"
elif slope < -0.01:
direction = "decreasing"
else:
direction = "stable"
return {
"slope": slope,
"intercept": intercept,
"r_squared": r_squared,
"direction": direction,
"strength": "strong" if r_squared > 0.7 else "weak"
}
def _moving_average_trend(self, data_points: List[Dict],
window_size: int = 5) -> Dict[str, Any]:
"""移动平均趋势分析"""
if len(data_points) < window_size:
return {"moving_average": [], "direction": "stable"}
values = [dp.get("value", 0) for dp in data_points]
moving_average = []
for i in range(len(values) - window_size + 1):
window = values[i:i + window_size]
avg = sum(window) / window_size
moving_average.append(avg)
# 分析移动平均趋势
if len(moving_average) >= 2:
recent_avg = sum(moving_average[-3:]) / 3
earlier_avg = sum(moving_average[:3]) / 3
if recent_avg > earlier_avg * 1.05:
direction = "increasing"
elif recent_avg < earlier_avg * 0.95:
direction = "decreasing"
else:
direction = "stable"
else:
direction = "stable"
return {
"moving_average": moving_average,
"window_size": window_size,
"direction": direction
}
def _exponential_trend(self, data_points: List[Dict],
alpha: float = 0.3) -> Dict[str, Any]:
"""指数趋势分析"""
if not data_points:
return {"exponential_average": [], "direction": "stable"}
values = [dp.get("value", 0) for dp in data_points]
exponential_average = [values[0]]
for i in range(1, len(values)):
exp_avg = alpha * values[i] + (1 - alpha) * exponential_average[-1]
exponential_average.append(exp_avg)
# 分析趋势
if len(exponential_average) >= 2:
recent = exponential_average[-1]
earlier = exponential_average[0]
if recent > earlier * 1.05:
direction = "increasing"
elif recent < earlier * 0.95:
direction = "decreasing"
else:
direction = "stable"
else:
direction = "stable"
return {
"exponential_average": exponential_average,
"alpha": alpha,
"direction": direction
}
def predict_future(self, metric_name: str, periods: int = 7) -> List[Dict]:
"""预测未来趋势"""
if metric_name not in self.trend_data:
return []
trend = self.trend_data[metric_name]["trend"]
slope = trend.get("slope", 0)
intercept = trend.get("intercept", 0)
predictions = []
last_index = len(self.trend_data[metric_name].get("data_points", []))
for i in range(periods):
future_index = last_index + i
predicted_value = slope * future_index + intercept
predictions.append({
"period": i + 1,
"predicted_value": predicted_value,
"confidence_interval": {
"lower": predicted_value * 0.9,
"upper": predicted_value * 1.1
}
})
return predictions
def detect_anomalies(self, data_points: List[Dict],
threshold: float = 2.0) -> List[Dict]:
"""检测异常"""
if len(data_points) < 10:
return []
values = [dp.get("value", 0) for dp in data_points]
mean = sum(values) / len(values)
std = (sum((x - mean) ** 2 for x in values) / len(values)) ** 0.5
anomalies = []
for i, (dp, value) in enumerate(zip(data_points, values)):
z_score = (value - mean) / std if std > 0 else 0
if abs(z_score) > threshold:
anomalies.append({
"index": i,
"timestamp": dp.get("timestamp", ""),
"value": value,
"z_score": z_score,
"type": "high" if z_score > 0 else "low"
})
return anomalies
自定义看板
看板构建器
class DashboardBuilder:
"""看板构建器"""
def __init__(self):
self.layout_engine = LayoutEngine()
self.widget_factory = WidgetFactory()
self.theme_manager = ThemeManager()
def create_custom_dashboard(self, config: Dict[str, Any]) -> LLMDashboard:
"""创建自定义仪表板"""
dashboard = LLMDashboard(
dashboard_id=config.get("id", "custom_dashboard"),
name=config.get("name", "Custom Dashboard")
)
# 应用主题
theme = config.get("theme", "default")
dashboard.theme = self.theme_manager.get_theme(theme)
# 添加组件
for widget_config in config.get("widgets", []):
dashboard.add_widget(widget_config)
# 设置布局
layout = config.get("layout", "grid")
dashboard.layout = self.layout_engine.create_layout(layout, dashboard.widgets)
return dashboard
def save_dashboard_config(self, dashboard: LLMDashboard,
file_path: str):
"""保存仪表板配置"""
config = {
"dashboard_id": dashboard.dashboard_id,
"name": dashboard.name,
"widgets": dashboard.widgets,
"layout": getattr(dashboard, "layout", {}),
"theme": getattr(dashboard, "theme", {}),
"saved_at": datetime.now().isoformat()
}
with open(file_path, 'w') as f:
json.dump(config, f, indent=2)
def load_dashboard_config(self, file_path: str) -> Dict[str, Any]:
"""加载仪表板配置"""
with open(file_path, 'r') as f:
return json.load(f)
def clone_dashboard(self, source_dashboard: LLMDashboard,
new_name: str) -> LLMDashboard:
"""克隆仪表板"""
config = {
"id": f"{source_dashboard.dashboard_id}_clone",
"name": new_name,
"widgets": source_dashboard.widgets.copy(),
"layout": getattr(source_dashboard, "layout", {}),
"theme": getattr(source_dashboard, "theme", {})
}
return self.create_custom_dashboard(config)
class LayoutEngine:
"""布局引擎"""
def __init__(self):
self.layout_types = ["grid", "freeform", "responsive"]
def create_layout(self, layout_type: str, widgets: List[Dict]) -> Dict:
"""创建布局"""
if layout_type == "grid":
return self._create_grid_layout(widgets)
elif layout_type == "freeform":
return self._create_freeform_layout(widgets)
elif layout_type == "responsive":
return self._create_responsive_layout(widgets)
else:
return self._create_grid_layout(widgets)
def _create_grid_layout(self, widgets: List[Dict]) -> Dict:
"""创建网格布局"""
columns = 3
rows = (len(widgets) // columns) + 1
layout = {
"type": "grid",
"columns": columns,
"rows": rows,
"cells": []
}
for i, widget in enumerate(widgets):
row = i // columns
col = i % columns
layout["cells"].append({
"widget_id": widget["id"],
"row": row,
"col": col,
"width": 1,
"height": 1
})
return layout
def _create_freeform_layout(self, widgets: List[Dict]) -> Dict:
"""创建自由布局"""
layout = {
"type": "freeform",
"positions": []
}
x_offset = 0
for widget in widgets:
layout["positions"].append({
"widget_id": widget["id"],
"x": x_offset,
"y": 0,
"width": widget.get("width", 300),
"height": widget.get("height", 200)
})
x_offset += widget.get("width", 300) + 20
return layout
def _create_responsive_layout(self, widgets: List[Dict]) -> Dict:
"""创建响应式布局"""
return {
"type": "responsive",
"breakpoints": {
"mobile": {"columns": 1},
"tablet": {"columns": 2},
"desktop": {"columns": 3}
},
"widgets": widgets
}
class WidgetFactory:
"""组件工厂"""
def __init__(self):
self.widget_types = {
"metric_card": DashboardWidgetTypes.metric_card,
"line_chart": DashboardWidgetTypes.line_chart,
"bar_chart": DashboardWidgetTypes.bar_chart,
"pie_chart": DashboardWidgetTypes.pie_chart,
"gauge_chart": DashboardWidgetTypes.gauge_chart,
"heatmap": DashboardWidgetTypes.heatmap,
"table": DashboardWidgetTypes.table,
"alert_panel": DashboardWidgetTypes.alert_panel
}
def create_widget(self, widget_type: str, **kwargs) -> Dict:
"""创建组件"""
if widget_type in self.widget_types:
return self.widget_types[widget_type](**kwargs)
else:
raise ValueError(f"Unknown widget type: {widget_type}")
class ThemeManager:
"""主题管理器"""
def __init__(self):
self.themes = {
"default": {
"background": "#f5f5f5",
"widget_background": "#ffffff",
"text_color": "#333333",
"border_color": "#e0e0e0",
"accent_color": "#2196F3"
},
"dark": {
"background": "#1a1a1a",
"widget_background": "#2d2d2d",
"text_color": "#ffffff",
"border_color": "#404040",
"accent_color": "#4CAF50"
},
"blue": {
"background": "#e3f2fd",
"widget_background": "#ffffff",
"text_color": "#1565c0",
"border_color": "#90caf9",
"accent_color": "#1976d2"
}
}
def get_theme(self, theme_name: str) -> Dict:
"""获取主题"""
return self.themes.get(theme_name, self.themes["default"])
def add_theme(self, name: str, theme_config: Dict):
"""添加主题"""
self.themes[name] = theme_config
def list_themes(self) -> List[str]:
"""列出所有主题"""
return list(self.themes.keys())
仪表板交互
交互管理器
class DashboardInteractionManager:
"""仪表板交互管理器"""
def __init__(self):
self.event_handlers = {}
self.filter_state = {}
self.time_range = {"start": None, "end": None}
def register_event_handler(self, event_type: str, handler):
"""注册事件处理器"""
if event_type not in self.event_handlers:
self.event_handlers[event_type] = []
self.event_handlers[event_type].append(handler)
def handle_event(self, event_type: str, event_data: Dict):
"""处理事件"""
if event_type in self.event_handlers:
for handler in self.event_handlers[event_type]:
handler(event_data)
def apply_filter(self, filter_name: str, filter_value: Any):
"""应用过滤器"""
self.filter_state[filter_name] = filter_value
self._notify_filter_change()
def clear_filter(self, filter_name: str):
"""清除过滤器"""
if filter_name in self.filter_state:
del self.filter_state[filter_name]
self._notify_filter_change()
def clear_all_filters(self):
"""清除所有过滤器"""
self.filter_state.clear()
self._notify_filter_change()
def set_time_range(self, start: datetime, end: datetime):
"""设置时间范围"""
self.time_range = {"start": start, "end": end}
self._notify_time_range_change()
def get_current_filters(self) -> Dict[str, Any]:
"""获取当前过滤器"""
return self.filter_state.copy()
def get_current_time_range(self) -> Dict[str, datetime]:
"""获取当前时间范围"""
return self.time_range.copy()
def _notify_filter_change(self):
"""通知过滤器变更"""
self.handle_event("filter_changed", self.filter_state)
def _notify_time_range_change(self):
"""通知时间范围变更"""
self.handle_event("time_range_changed", self.time_range)
def export_filters(self) -> str:
"""导出过滤器"""
return json.dumps({
"filters": self.filter_state,
"time_range": self.time_range
})
def import_filters(self, filters_json: str):
"""导入过滤器"""
data = json.loads(filters_json)
self.filter_state = data.get("filters", {})
self.time_range = data.get("time_range", {"start": None, "end": None})
总结
LLM仪表板是监控和管理大语言模型的关键工具。通过设计合理的监控面板、实现趋势分析和提供自定义看板功能,组织可以实时了解模型性能,及时发现问题并做出优化决策。结合交互管理和可视化组件,仪表板能够为不同用户提供个性化的监控体验。