← 返回首页
🧠

使用分析:LLM服务使用数据的收集与分析

📂 llm ⏱ 4 min 781 words

--- title: "使用分析:LLM服务使用数据的收集与分析" description: "全面介绍LLM服务使用数据的收集方法、分析维度、可视化展示及基于数据的优化决策实践" tags: ["使用分析", "数据分析", "LLM监控", "性能分析"] category: "llm" icon: "🧠"

使用分析:LLM服务使用数据的收集与分析

为什么要进行使用分析

LLM应用上线后,需要通过数据分析了解服务使用情况、发现性能瓶颈、优化用户体验。系统化的使用分析能帮助团队做出数据驱动的决策。

数据收集框架

核心指标定义

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import uuid

@dataclass
class LLMUsageEvent:
    event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: datetime = field(default_factory=datetime.now)

    # 基础信息
    user_id: Optional[str] = None
    session_id: Optional[str] = None
    feature: str = "chat"
    model: str = "gpt-4o"

    # 性能指标
    request_latency_ms: float = 0
    first_token_latency_ms: float = 0
    tokens_per_second: float = 0

    # Token使用
    input_tokens: int = 0
    output_tokens: int = 0
    total_tokens: int = 0

    # 质量指标
    finish_reason: str = "stop"
    error_type: Optional[str] = None

    # 上下文
    input_length: int = 0
    conversation_turns: int = 0
    has_tools: bool = False

数据收集器

import asyncio
from collections import deque

class UsageDataCollector:
    def __init__(self, batch_size: int = 100, flush_interval: int = 60):
        self.buffer = deque()
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self._running = False

    async def start(self):
        self._running = True
        asyncio.create_task(self._flush_loop())

    async def collect(self, event: LLMUsageEvent):
        self.buffer.append(event)
        if len(self.buffer) >= self.batch_size:
            await self._flush()

    async def _flush_loop(self):
        while self._running:
            await asyncio.sleep(self.flush_interval)
            await self._flush()

    async def _flush(self):
        if not self.buffer:
            return

        events = list(self.buffer)
        self.buffer.clear()

        # 批量写入存储
        await self._write_to_storage(events)
        logger.info(f"已写入 {len(events)} 条使用记录")

    async def _write_to_storage(self, events: list[LLMUsageEvent]):
        # 写入数据库或数据仓库
        pass

分析维度

1. 用户维度分析

class UserAnalytics:
    def __init__(self, storage):
        self.storage = storage

    async def get_user_stats(self, user_id: str, days: int = 30) -> dict:
        events = await self.storage.query(
            user_id=user_id,
            start_date=datetime.now() - timedelta(days=days),
        )

        return {
            "user_id": user_id,
            "total_requests": len(events),
            "total_tokens": sum(e.total_tokens for e in events),
            "total_cost": sum(e._cost for e in events),
            "avg_latency": sum(e.request_latency_ms for e in events) / len(events),
            "favorite_features": self._top_features(events),
            "active_hours": self._active_hours(events),
            "error_rate": self._error_rate(events),
        }

    def _top_features(self, events, top_n=5):
        feature_counts = {}
        for e in events:
            feature_counts[e.feature] = feature_counts.get(e.feature, 0) + 1
        return sorted(feature_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]

2. 模型性能分析

class ModelPerformanceAnalytics:
    def __init__(self, storage):
        self.storage = storage

    async def compare_models(self, days: int = 7) -> dict:
        events = await self.storage.query(days=days)
        models = {}

        for event in events:
            if event.model not in models:
                models[event.model] = {
                    "requests": 0,
                    "latencies": [],
                    "token_rates": [],
                    "errors": 0,
                }

            stats = models[event.model]
            stats["requests"] += 1
            stats["latencies"].append(event.request_latency_ms)
            stats["token_rates"].append(event.tokens_per_second)
            if event.error_type:
                stats["errors"] += 1

        result = {}
        for model, stats in models.items():
            result[model] = {
                "requests": stats["requests"],
                "avg_latency_ms": sum(stats["latencies"]) / len(stats["latencies"]),
                "p95_latency_ms": self._percentile(stats["latencies"], 95),
                "avg_tokens_per_sec": sum(stats["token_rates"]) / len(stats["token_rates"]),
                "error_rate": stats["errors"] / stats["requests"],
            }

        return result

    def _percentile(self, data, percentile):
        sorted_data = sorted(data)
        index = int(len(sorted_data) * percentile / 100)
        return sorted_data[min(index, len(sorted_data) - 1)]

3. 功能使用分析

class FeatureAnalytics:
    def __init__(self, storage):
        self.storage = storage

    async def get_feature_breakdown(self, days: int = 30) -> dict:
        events = await self.storage.query(days=days)
        features = {}

        for event in events:
            feature = event.feature
            if feature not in features:
                features[feature] = {
                    "count": 0,
                    "total_tokens": 0,
                    "total_latency": 0,
                    "errors": 0,
                    "unique_users": set(),
                }

            stats = features[feature]
            stats["count"] += 1
            stats["total_tokens"] += event.total_tokens
            stats["total_latency"] += event.request_latency_ms
            if event.error_type:
                stats["errors"] += 1
            if event.user_id:
                stats["unique_users"].add(event.user_id)

        # 转换为可序列化格式
        result = {}
        for feature, stats in features.items():
            result[feature] = {
                "requests": stats["count"],
                "unique_users": len(stats["unique_users"]),
                "avg_tokens_per_request": stats["total_tokens"] / stats["count"],
                "avg_latency_ms": stats["total_latency"] / stats["count"],
                "error_rate": stats["errors"] / stats["count"],
            }

        return result

实时监控仪表板

class RealtimeDashboard:
    def __init__(self, storage):
        self.storage = storage
        self.metrics_cache = {}

    async def get_dashboard_data(self) -> dict:
        now = datetime.now()

        # 最近1小时数据
        recent_events = await self.storage.query(
            start_date=now - timedelta(hours=1)
        )

        # 最近24小时数据
        daily_events = await self.storage.query(
            start_date=now - timedelta(hours=24)
        )

        return {
            "realtime": {
                "requests_last_hour": len(recent_events),
                "active_users": len(set(e.user_id for e in recent_events if e.user_id)),
                "avg_latency_ms": self._avg(recent_events, "request_latency_ms"),
                "error_rate": self._error_rate(recent_events),
            },
            "daily": {
                "total_requests": len(daily_events),
                "total_tokens": sum(e.total_tokens for e in daily_events),
                "total_cost": sum(e._cost for e in daily_events),
                "top_models": self._top_models(daily_events),
                "peak_hours": self._peak_hours(daily_events),
            },
            "trends": await self._calculate_trends(days=7),
        }

    def _avg(self, events, field_name):
        if not events:
            return 0
        return sum(getattr(e, field_name) for e in events) / len(events)

    def _error_rate(self, events):
        if not events:
            return 0
        errors = sum(1 for e in events if e.error_type)
        return errors / len(events)

异常检测

class AnomalyDetector:
    def __init__(self, storage, sensitivity: float = 2.0):
        self.storage = storage
        self.sensitivity = sensitivity
        self.baseline = None

    async def detect_anomalies(self, metric: str, hours: int = 24) -> list:
        if self.baseline is None:
            self.baseline = await self._calculate_baseline(metric)

        recent = await self._get_recent_values(metric, hours)
        anomalies = []

        for timestamp, value in recent:
            z_score = (value - self.baseline["mean"]) / self.baseline["std"]
            if abs(z_score) > self.sensitivity:
                anomalies.append({
                    "timestamp": timestamp,
                    "value": value,
                    "z_score": z_score,
                    "type": "high" if z_score > 0 else "low",
                })

        return anomalies

    async def _calculate_baseline(self, metric):
        historical = await self.storage.query(days=30)
        values = [getattr(e, metric) for e in historical]
        return {
            "mean": sum(values) / len(values),
            "std": (sum((x - sum(values)/len(values))**2 for x in values) / len(values)) ** 0.5,
        }

报表生成

class UsageReportGenerator:
    def __init__(self, analytics):
        self.analytics = analytics

    async def generate_weekly_report(self) -> str:
        user_stats = await self.analytics.get_user_stats(days=7)
        model_stats = await self.analytics.compare_models(days=7)
        feature_stats = await self.analytics.get_feature_breakdown(days=7)

        report = f"""
# LLM使用周报

## 总体概览
- 总请求数: {user_stats['total_requests']}
- 总Token数: {user_stats['total_tokens']}
- 平均延迟: {user_stats['avg_latency']:.0f}ms

## 模型性能
"""
        for model, stats in model_stats.items():
            report += f"- {model}: {stats['requests']}次请求, "
            report += f"P95延迟 {stats['p95_latency_ms']:.0f}ms, "
            report += f"错误率 {stats['error_rate']:.1%}\n"

        return report

总结

系统化的使用分析是LLM应用持续优化的基础。通过多维度的数据收集、实时监控、异常检测和报表生成,团队可以做出基于数据的决策,不断提升服务质量和用户体验。