← 返回首页
🧠

LLM仪表板

📂 llm ⏱ 11 min 2108 words

--- title: "LLM仪表板" description: "详细介绍LLM监控仪表板设计,包括监控面板、趋势分析、自定义看板等核心功能" tags: ["LLM仪表板", "监控面板", "趋势分析", "自定义看板"] category: "llm" icon: "🧠"

LLM仪表板

LLM仪表板概述

LLM仪表板是实时监控和分析大语言模型性能、使用情况和业务影响的可视化界面。一个优秀的仪表板可以帮助运维团队快速发现问题、优化性能并做出数据驱动的决策。

仪表板架构设计

from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import json

class LLMDashboard:
    """LLM仪表板"""
    def __init__(self, dashboard_id: str, name: str):
        self.dashboard_id = dashboard_id
        self.name = name
        self.widgets = []
        self.refresh_interval = 30  # 秒
        self.data_sources = {}
        self.alert_rules = []
        self.user_preferences = {}
    
    def add_widget(self, widget_config: Dict[str, Any]) -> str:
        """添加组件"""
        widget_id = f"widget_{len(self.widgets) + 1}"
        
        widget = {
            "id": widget_id,
            "type": widget_config.get("type", "metric_card"),
            "title": widget_config.get("title", ""),
            "position": widget_config.get("position", {"x": 0, "y": 0, "w": 1, "h": 1}),
            "data_source": widget_config.get("data_source", ""),
            "refresh_rate": widget_config.get("refresh_rate", 60),
            "visualization": widget_config.get("visualization", {}),
            "filters": widget_config.get("filters", []),
            "created_at": datetime.now().isoformat()
        }
        
        self.widgets.append(widget)
        return widget_id
    
    def update_widget(self, widget_id: str, updates: Dict[str, Any]) -> bool:
        """更新组件"""
        for widget in self.widgets:
            if widget["id"] == widget_id:
                widget.update(updates)
                widget["updated_at"] = datetime.now().isoformat()
                return True
        return False
    
    def remove_widget(self, widget_id: str) -> bool:
        """移除组件"""
        original_count = len(self.widgets)
        self.widgets = [w for w in self.widgets if w["id"] != widget_id]
        return len(self.widgets) < original_count
    
    def get_dashboard_data(self) -> Dict[str, Any]:
        """获取仪表板数据"""
        dashboard_data = {
            "dashboard_id": self.dashboard_id,
            "name": self.name,
            "widgets": [],
            "last_updated": datetime.now().isoformat(),
            "status": "active"
        }
        
        for widget in self.widgets:
            widget_data = self._fetch_widget_data(widget)
            dashboard_data["widgets"].append({
                "widget": widget,
                "data": widget_data
            })
        
        return dashboard_data
    
    def _fetch_widget_data(self, widget: Dict) -> Any:
        """获取组件数据"""
        data_source = widget.get("data_source", "")
        
        if data_source in self.data_sources:
            return self.data_sources[data_source]()
        
        return {"value": 0, "trend": "stable"}
    
    def set_data_source(self, name: str, fetcher):
        """设置数据源"""
        self.data_sources[name] = fetcher
    
    def add_alert_rule(self, rule_config: Dict) -> str:
        """添加告警规则"""
        rule_id = f"rule_{len(self.alert_rules) + 1}"
        
        rule = {
            "id": rule_id,
            "name": rule_config.get("name", ""),
            "condition": rule_config.get("condition", {}),
            "action": rule_config.get("action", {}),
            "enabled": rule_config.get("enabled", True),
            "created_at": datetime.now().isoformat()
        }
        
        self.alert_rules.append(rule)
        return rule_id
    
    def export_dashboard(self, format: str = "json") -> bytes:
        """导出仪表板"""
        dashboard_config = {
            "dashboard_id": self.dashboard_id,
            "name": self.name,
            "widgets": self.widgets,
            "alert_rules": self.alert_rules,
            "export_time": datetime.now().isoformat()
        }
        
        if format == "json":
            return json.dumps(dashboard_config, indent=2).encode()
        elif format == "html":
            return self._generate_html()
        else:
            return json.dumps(dashboard_config).encode()
    
    def _generate_html(self) -> bytes:
        """生成HTML"""
        html = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>{self.name}</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 0; padding: 20px; background: #f5f5f5; }}
                .dashboard {{ display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 20px; }}
                .widget {{ background: white; border-radius: 8px; padding: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }}
                .widget-title {{ font-size: 14px; color: #666; margin-bottom: 10px; }}
                .widget-value {{ font-size: 24px; font-weight: bold; color: #333; }}
                .trend-up {{ color: #4CAF50; }}
                .trend-down {{ color: #f44336; }}
            </style>
        </head>
        <body>
            <h1>{self.name}</h1>
            <div class="dashboard">
        """
        
        for widget in self.widgets:
            html += f"""
                <div class="widget">
                    <div class="widget-title">{widget.get('title', '')}</div>
                    <div class="widget-value">--</div>
                </div>
            """
        
        html += """
            </div>
        </body>
        </html>
        """
        
        return html.encode()

监控面板设计

实时监控组件

class RealTimeMonitor:
    """实时监控器"""
    def __init__(self):
        self.metrics_buffer = []
        self.update_callbacks = []
        self.is_running = False
    
    def start_monitoring(self):
        """开始监控"""
        self.is_running = True
        self._monitoring_loop()
    
    def stop_monitoring(self):
        """停止监控"""
        self.is_running = False
    
    def _monitoring_loop(self):
        """监控循环"""
        while self.is_running:
            # 收集实时指标
            metrics = self._collect_real_time_metrics()
            self.metrics_buffer.append(metrics)
            
            # 保持缓冲区大小
            if len(self.metrics_buffer) > 1000:
                self.metrics_buffer = self.metrics_buffer[-1000:]
            
            # 触发更新回调
            for callback in self.update_callbacks:
                callback(metrics)
            
            # 等待下一个更新周期
            import time
            time.sleep(1)
    
    def _collect_real_time_metrics(self) -> Dict[str, Any]:
        """收集实时指标"""
        return {
            "timestamp": datetime.now().isoformat(),
            "requests_per_second": self._get_rps(),
            "average_latency": self._get_avg_latency(),
            "error_rate": self._get_error_rate(),
            "active_users": self._get_active_users(),
            "model_load": self._get_model_load()
        }
    
    def _get_rps(self) -> float:
        """获取每秒请求数"""
        # 简化实现
        import random
        return random.uniform(100, 500)
    
    def _get_avg_latency(self) -> float:
        """获取平均延迟"""
        import random
        return random.uniform(0.2, 1.0)
    
    def _get_error_rate(self) -> float:
        """获取错误率"""
        import random
        return random.uniform(0, 0.05)
    
    def _get_active_users(self) -> int:
        """获取活跃用户数"""
        import random
        return random.randint(50, 200)
    
    def _get_model_load(self) -> float:
        """获取模型负载"""
        import random
        return random.uniform(0.3, 0.8)
    
    def add_update_callback(self, callback):
        """添加更新回调"""
        self.update_callbacks.append(callback)
    
    def get_historical_metrics(self, duration_minutes: int = 60) -> List[Dict]:
        """获取历史指标"""
        cutoff_time = datetime.now() - timedelta(minutes=duration_minutes)
        
        return [
            m for m in self.metrics_buffer
            if datetime.fromisoformat(m["timestamp"]) > cutoff_time
        ]
    
    def calculate_statistics(self, metrics: List[Dict]) -> Dict[str, Any]:
        """计算统计信息"""
        if not metrics:
            return {}
        
        rps_values = [m["requests_per_second"] for m in metrics]
        latency_values = [m["average_latency"] for m in metrics]
        error_rates = [m["error_rate"] for m in metrics]
        
        return {
            "rps": {
                "average": sum(rps_values) / len(rps_values),
                "min": min(rps_values),
                "max": max(rps_values),
                "current": rps_values[-1] if rps_values else 0
            },
            "latency": {
                "average": sum(latency_values) / len(latency_values),
                "min": min(latency_values),
                "max": max(latency_values),
                "p95": self._calculate_percentile(latency_values, 95)
            },
            "error_rate": {
                "average": sum(error_rates) / len(error_rates),
                "max": max(error_rates),
                "trend": self._calculate_trend(error_rates)
            }
        }
    
    def _calculate_percentile(self, values: List[float], percentile: float) -> float:
        """计算百分位数"""
        sorted_values = sorted(values)
        index = int(len(sorted_values) * percentile / 100)
        return sorted_values[min(index, len(sorted_values) - 1)]
    
    def _calculate_trend(self, values: List[float]) -> str:
        """计算趋势"""
        if len(values) < 2:
            return "stable"
        
        recent_avg = sum(values[-10:]) / min(10, len(values[-10:]))
        earlier_avg = sum(values[:10]) / min(10, len(values[:10]))
        
        if recent_avg > earlier_avg * 1.1:
            return "increasing"
        elif recent_avg < earlier_avg * 0.9:
            return "decreasing"
        else:
            return "stable"

组件类型定义

class DashboardWidgetTypes:
    """仪表板组件类型"""
    
    @staticmethod
    def metric_card(title: str, value: Any, unit: str = "", 
                   trend: str = "stable") -> Dict:
        """指标卡片"""
        return {
            "type": "metric_card",
            "title": title,
            "value": value,
            "unit": unit,
            "trend": trend,
            "style": {
                "background": "#ffffff",
                "border": "1px solid #e0e0e0",
                "borderRadius": "8px",
                "padding": "20px"
            }
        }
    
    @staticmethod
    def line_chart(title: str, data: List[Dict], 
                  x_axis: str = "time", y_axis: str = "value") -> Dict:
        """折线图"""
        return {
            "type": "line_chart",
            "title": title,
            "data": data,
            "x_axis": x_axis,
            "y_axis": y_axis,
            "options": {
                "grid": True,
                "legend": True,
                "tooltip": True,
                "animation": True
            }
        }
    
    @staticmethod
    def bar_chart(title: str, categories: List[str], 
                 values: List[float], colors: List[str] = None) -> Dict:
        """柱状图"""
        return {
            "type": "bar_chart",
            "title": title,
            "categories": categories,
            "values": values,
            "colors": colors or ["#4CAF50", "#2196F3", "#FF9800", "#9C27B0"],
            "options": {
                "horizontal": False,
                "stacked": False,
                "showValues": True
            }
        }
    
    @staticmethod
    def pie_chart(title: str, segments: List[Dict]) -> Dict:
        """饼图"""
        return {
            "type": "pie_chart",
            "title": title,
            "segments": segments,
            "options": {
                "donut": False,
                "showLabels": True,
                "showPercentages": True
            }
        }
    
    @staticmethod
    def gauge_chart(title: str, value: float, min_val: float = 0, 
                   max_val: float = 100) -> Dict:
        """仪表盘图"""
        return {
            "type": "gauge_chart",
            "title": title,
            "value": value,
            "min": min_val,
            "max": max_val,
            "thresholds": [
                {"value": max_val * 0.3, "color": "#4CAF50"},
                {"value": max_val * 0.7, "color": "#FF9800"},
                {"value": max_val * 0.9, "color": "#f44336"}
            ]
        }
    
    @staticmethod
    def heatmap(title: str, data: List[List[float]], 
               x_labels: List[str], y_labels: List[str]) -> Dict:
        """热力图"""
        return {
            "type": "heatmap",
            "title": title,
            "data": data,
            "x_labels": x_labels,
            "y_labels": y_labels,
            "color_scale": ["#ffffff", "#4CAF50"],
            "options": {
                "showValues": True,
                "interactive": True
            }
        }
    
    @staticmethod
    def table(title: str, columns: List[Dict], rows: List[List]) -> Dict:
        """表格"""
        return {
            "type": "table",
            "title": title,
            "columns": columns,
            "rows": rows,
            "options": {
                "sortable": True,
                "filterable": True,
                "pagination": True,
                "pageSize": 10
            }
        }
    
    @staticmethod
    def alert_panel(title: str, alerts: List[Dict]) -> Dict:
        """告警面板"""
        return {
            "type": "alert_panel",
            "title": title,
            "alerts": alerts,
            "options": {
                "maxAlerts": 10,
                "showTimestamp": True,
                "autoRefresh": True
            }
        }

趋势分析

趋势分析器

class TrendAnalyzer:
    """趋势分析器"""
    def __init__(self):
        self.trend_data = {}
        self.analysis_methods = {
            "linear": self._linear_trend,
            "moving_average": self._moving_average_trend,
            "exponential": self._exponential_trend
        }
    
    def analyze_trend(self, metric_name: str, data_points: List[Dict], 
                     method: str = "linear") -> Dict[str, Any]:
        """分析趋势"""
        if method not in self.analysis_methods:
            raise ValueError(f"Unknown method: {method}")
        
        trend_result = self.analysis_methods[method](data_points)
        
        # 存储趋势数据
        self.trend_data[metric_name] = {
            "trend": trend_result,
            "analyzed_at": datetime.now().isoformat(),
            "data_points_count": len(data_points)
        }
        
        return trend_result
    
    def _linear_trend(self, data_points: List[Dict]) -> Dict[str, Any]:
        """线性趋势分析"""
        if len(data_points) < 2:
            return {"slope": 0, "intercept": 0, "r_squared": 0, "direction": "stable"}
        
        x_values = list(range(len(data_points)))
        y_values = [dp.get("value", 0) for dp in data_points]
        
        # 简单线性回归
        n = len(x_values)
        sum_x = sum(x_values)
        sum_y = sum(y_values)
        sum_xy = sum(x * y for x, y in zip(x_values, y_values))
        sum_x2 = sum(x * x for x in x_values)
        
        denominator = n * sum_x2 - sum_x * sum_x
        if denominator == 0:
            return {"slope": 0, "intercept": sum_y / n, "r_squared": 0, "direction": "stable"}
        
        slope = (n * sum_xy - sum_x * sum_y) / denominator
        intercept = (sum_y - slope * sum_x) / n
        
        # 计算R²
        y_mean = sum_y / n
        ss_total = sum((y - y_mean) ** 2 for y in y_values)
        ss_residual = sum((y - (slope * x + intercept)) ** 2 for x, y in zip(x_values, y_values))
        r_squared = 1 - (ss_residual / ss_total) if ss_total > 0 else 0
        
        # 确定趋势方向
        if slope > 0.01:
            direction = "increasing"
        elif slope < -0.01:
            direction = "decreasing"
        else:
            direction = "stable"
        
        return {
            "slope": slope,
            "intercept": intercept,
            "r_squared": r_squared,
            "direction": direction,
            "strength": "strong" if r_squared > 0.7 else "weak"
        }
    
    def _moving_average_trend(self, data_points: List[Dict], 
                            window_size: int = 5) -> Dict[str, Any]:
        """移动平均趋势分析"""
        if len(data_points) < window_size:
            return {"moving_average": [], "direction": "stable"}
        
        values = [dp.get("value", 0) for dp in data_points]
        moving_average = []
        
        for i in range(len(values) - window_size + 1):
            window = values[i:i + window_size]
            avg = sum(window) / window_size
            moving_average.append(avg)
        
        # 分析移动平均趋势
        if len(moving_average) >= 2:
            recent_avg = sum(moving_average[-3:]) / 3
            earlier_avg = sum(moving_average[:3]) / 3
            
            if recent_avg > earlier_avg * 1.05:
                direction = "increasing"
            elif recent_avg < earlier_avg * 0.95:
                direction = "decreasing"
            else:
                direction = "stable"
        else:
            direction = "stable"
        
        return {
            "moving_average": moving_average,
            "window_size": window_size,
            "direction": direction
        }
    
    def _exponential_trend(self, data_points: List[Dict], 
                          alpha: float = 0.3) -> Dict[str, Any]:
        """指数趋势分析"""
        if not data_points:
            return {"exponential_average": [], "direction": "stable"}
        
        values = [dp.get("value", 0) for dp in data_points]
        exponential_average = [values[0]]
        
        for i in range(1, len(values)):
            exp_avg = alpha * values[i] + (1 - alpha) * exponential_average[-1]
            exponential_average.append(exp_avg)
        
        # 分析趋势
        if len(exponential_average) >= 2:
            recent = exponential_average[-1]
            earlier = exponential_average[0]
            
            if recent > earlier * 1.05:
                direction = "increasing"
            elif recent < earlier * 0.95:
                direction = "decreasing"
            else:
                direction = "stable"
        else:
            direction = "stable"
        
        return {
            "exponential_average": exponential_average,
            "alpha": alpha,
            "direction": direction
        }
    
    def predict_future(self, metric_name: str, periods: int = 7) -> List[Dict]:
        """预测未来趋势"""
        if metric_name not in self.trend_data:
            return []
        
        trend = self.trend_data[metric_name]["trend"]
        slope = trend.get("slope", 0)
        intercept = trend.get("intercept", 0)
        
        predictions = []
        last_index = len(self.trend_data[metric_name].get("data_points", []))
        
        for i in range(periods):
            future_index = last_index + i
            predicted_value = slope * future_index + intercept
            predictions.append({
                "period": i + 1,
                "predicted_value": predicted_value,
                "confidence_interval": {
                    "lower": predicted_value * 0.9,
                    "upper": predicted_value * 1.1
                }
            })
        
        return predictions
    
    def detect_anomalies(self, data_points: List[Dict], 
                        threshold: float = 2.0) -> List[Dict]:
        """检测异常"""
        if len(data_points) < 10:
            return []
        
        values = [dp.get("value", 0) for dp in data_points]
        mean = sum(values) / len(values)
        std = (sum((x - mean) ** 2 for x in values) / len(values)) ** 0.5
        
        anomalies = []
        for i, (dp, value) in enumerate(zip(data_points, values)):
            z_score = (value - mean) / std if std > 0 else 0
            if abs(z_score) > threshold:
                anomalies.append({
                    "index": i,
                    "timestamp": dp.get("timestamp", ""),
                    "value": value,
                    "z_score": z_score,
                    "type": "high" if z_score > 0 else "low"
                })
        
        return anomalies

自定义看板

看板构建器

class DashboardBuilder:
    """看板构建器"""
    def __init__(self):
        self.layout_engine = LayoutEngine()
        self.widget_factory = WidgetFactory()
        self.theme_manager = ThemeManager()
    
    def create_custom_dashboard(self, config: Dict[str, Any]) -> LLMDashboard:
        """创建自定义仪表板"""
        dashboard = LLMDashboard(
            dashboard_id=config.get("id", "custom_dashboard"),
            name=config.get("name", "Custom Dashboard")
        )
        
        # 应用主题
        theme = config.get("theme", "default")
        dashboard.theme = self.theme_manager.get_theme(theme)
        
        # 添加组件
        for widget_config in config.get("widgets", []):
            dashboard.add_widget(widget_config)
        
        # 设置布局
        layout = config.get("layout", "grid")
        dashboard.layout = self.layout_engine.create_layout(layout, dashboard.widgets)
        
        return dashboard
    
    def save_dashboard_config(self, dashboard: LLMDashboard, 
                            file_path: str):
        """保存仪表板配置"""
        config = {
            "dashboard_id": dashboard.dashboard_id,
            "name": dashboard.name,
            "widgets": dashboard.widgets,
            "layout": getattr(dashboard, "layout", {}),
            "theme": getattr(dashboard, "theme", {}),
            "saved_at": datetime.now().isoformat()
        }
        
        with open(file_path, 'w') as f:
            json.dump(config, f, indent=2)
    
    def load_dashboard_config(self, file_path: str) -> Dict[str, Any]:
        """加载仪表板配置"""
        with open(file_path, 'r') as f:
            return json.load(f)
    
    def clone_dashboard(self, source_dashboard: LLMDashboard, 
                       new_name: str) -> LLMDashboard:
        """克隆仪表板"""
        config = {
            "id": f"{source_dashboard.dashboard_id}_clone",
            "name": new_name,
            "widgets": source_dashboard.widgets.copy(),
            "layout": getattr(source_dashboard, "layout", {}),
            "theme": getattr(source_dashboard, "theme", {})
        }
        
        return self.create_custom_dashboard(config)

class LayoutEngine:
    """布局引擎"""
    def __init__(self):
        self.layout_types = ["grid", "freeform", "responsive"]
    
    def create_layout(self, layout_type: str, widgets: List[Dict]) -> Dict:
        """创建布局"""
        if layout_type == "grid":
            return self._create_grid_layout(widgets)
        elif layout_type == "freeform":
            return self._create_freeform_layout(widgets)
        elif layout_type == "responsive":
            return self._create_responsive_layout(widgets)
        else:
            return self._create_grid_layout(widgets)
    
    def _create_grid_layout(self, widgets: List[Dict]) -> Dict:
        """创建网格布局"""
        columns = 3
        rows = (len(widgets) // columns) + 1
        
        layout = {
            "type": "grid",
            "columns": columns,
            "rows": rows,
            "cells": []
        }
        
        for i, widget in enumerate(widgets):
            row = i // columns
            col = i % columns
            
            layout["cells"].append({
                "widget_id": widget["id"],
                "row": row,
                "col": col,
                "width": 1,
                "height": 1
            })
        
        return layout
    
    def _create_freeform_layout(self, widgets: List[Dict]) -> Dict:
        """创建自由布局"""
        layout = {
            "type": "freeform",
            "positions": []
        }
        
        x_offset = 0
        for widget in widgets:
            layout["positions"].append({
                "widget_id": widget["id"],
                "x": x_offset,
                "y": 0,
                "width": widget.get("width", 300),
                "height": widget.get("height", 200)
            })
            x_offset += widget.get("width", 300) + 20
        
        return layout
    
    def _create_responsive_layout(self, widgets: List[Dict]) -> Dict:
        """创建响应式布局"""
        return {
            "type": "responsive",
            "breakpoints": {
                "mobile": {"columns": 1},
                "tablet": {"columns": 2},
                "desktop": {"columns": 3}
            },
            "widgets": widgets
        }

class WidgetFactory:
    """组件工厂"""
    def __init__(self):
        self.widget_types = {
            "metric_card": DashboardWidgetTypes.metric_card,
            "line_chart": DashboardWidgetTypes.line_chart,
            "bar_chart": DashboardWidgetTypes.bar_chart,
            "pie_chart": DashboardWidgetTypes.pie_chart,
            "gauge_chart": DashboardWidgetTypes.gauge_chart,
            "heatmap": DashboardWidgetTypes.heatmap,
            "table": DashboardWidgetTypes.table,
            "alert_panel": DashboardWidgetTypes.alert_panel
        }
    
    def create_widget(self, widget_type: str, **kwargs) -> Dict:
        """创建组件"""
        if widget_type in self.widget_types:
            return self.widget_types[widget_type](**kwargs)
        else:
            raise ValueError(f"Unknown widget type: {widget_type}")

class ThemeManager:
    """主题管理器"""
    def __init__(self):
        self.themes = {
            "default": {
                "background": "#f5f5f5",
                "widget_background": "#ffffff",
                "text_color": "#333333",
                "border_color": "#e0e0e0",
                "accent_color": "#2196F3"
            },
            "dark": {
                "background": "#1a1a1a",
                "widget_background": "#2d2d2d",
                "text_color": "#ffffff",
                "border_color": "#404040",
                "accent_color": "#4CAF50"
            },
            "blue": {
                "background": "#e3f2fd",
                "widget_background": "#ffffff",
                "text_color": "#1565c0",
                "border_color": "#90caf9",
                "accent_color": "#1976d2"
            }
        }
    
    def get_theme(self, theme_name: str) -> Dict:
        """获取主题"""
        return self.themes.get(theme_name, self.themes["default"])
    
    def add_theme(self, name: str, theme_config: Dict):
        """添加主题"""
        self.themes[name] = theme_config
    
    def list_themes(self) -> List[str]:
        """列出所有主题"""
        return list(self.themes.keys())

仪表板交互

交互管理器

class DashboardInteractionManager:
    """仪表板交互管理器"""
    def __init__(self):
        self.event_handlers = {}
        self.filter_state = {}
        self.time_range = {"start": None, "end": None}
    
    def register_event_handler(self, event_type: str, handler):
        """注册事件处理器"""
        if event_type not in self.event_handlers:
            self.event_handlers[event_type] = []
        self.event_handlers[event_type].append(handler)
    
    def handle_event(self, event_type: str, event_data: Dict):
        """处理事件"""
        if event_type in self.event_handlers:
            for handler in self.event_handlers[event_type]:
                handler(event_data)
    
    def apply_filter(self, filter_name: str, filter_value: Any):
        """应用过滤器"""
        self.filter_state[filter_name] = filter_value
        self._notify_filter_change()
    
    def clear_filter(self, filter_name: str):
        """清除过滤器"""
        if filter_name in self.filter_state:
            del self.filter_state[filter_name]
            self._notify_filter_change()
    
    def clear_all_filters(self):
        """清除所有过滤器"""
        self.filter_state.clear()
        self._notify_filter_change()
    
    def set_time_range(self, start: datetime, end: datetime):
        """设置时间范围"""
        self.time_range = {"start": start, "end": end}
        self._notify_time_range_change()
    
    def get_current_filters(self) -> Dict[str, Any]:
        """获取当前过滤器"""
        return self.filter_state.copy()
    
    def get_current_time_range(self) -> Dict[str, datetime]:
        """获取当前时间范围"""
        return self.time_range.copy()
    
    def _notify_filter_change(self):
        """通知过滤器变更"""
        self.handle_event("filter_changed", self.filter_state)
    
    def _notify_time_range_change(self):
        """通知时间范围变更"""
        self.handle_event("time_range_changed", self.time_range)
    
    def export_filters(self) -> str:
        """导出过滤器"""
        return json.dumps({
            "filters": self.filter_state,
            "time_range": self.time_range
        })
    
    def import_filters(self, filters_json: str):
        """导入过滤器"""
        data = json.loads(filters_json)
        self.filter_state = data.get("filters", {})
        self.time_range = data.get("time_range", {"start": None, "end": None})

总结

LLM仪表板是监控和管理大语言模型的关键工具。通过设计合理的监控面板、实现趋势分析和提供自定义看板功能,组织可以实时了解模型性能,及时发现问题并做出优化决策。结合交互管理和可视化组件,仪表板能够为不同用户提供个性化的监控体验。