使用分析:LLM服务使用数据的收集与分析
--- title: "使用分析:LLM服务使用数据的收集与分析" description: "全面介绍LLM服务使用数据的收集方法、分析维度、可视化展示及基于数据的优化决策实践" tags: ["使用分析", "数据分析", "LLM监控", "性能分析"] category: "llm" icon: "🧠"
使用分析:LLM服务使用数据的收集与分析
为什么要进行使用分析
LLM应用上线后,需要通过数据分析了解服务使用情况、发现性能瓶颈、优化用户体验。系统化的使用分析能帮助团队做出数据驱动的决策。
数据收集框架
核心指标定义
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import uuid
@dataclass
class LLMUsageEvent:
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
# 基础信息
user_id: Optional[str] = None
session_id: Optional[str] = None
feature: str = "chat"
model: str = "gpt-4o"
# 性能指标
request_latency_ms: float = 0
first_token_latency_ms: float = 0
tokens_per_second: float = 0
# Token使用
input_tokens: int = 0
output_tokens: int = 0
total_tokens: int = 0
# 质量指标
finish_reason: str = "stop"
error_type: Optional[str] = None
# 上下文
input_length: int = 0
conversation_turns: int = 0
has_tools: bool = False
数据收集器
import asyncio
from collections import deque
class UsageDataCollector:
def __init__(self, batch_size: int = 100, flush_interval: int = 60):
self.buffer = deque()
self.batch_size = batch_size
self.flush_interval = flush_interval
self._running = False
async def start(self):
self._running = True
asyncio.create_task(self._flush_loop())
async def collect(self, event: LLMUsageEvent):
self.buffer.append(event)
if len(self.buffer) >= self.batch_size:
await self._flush()
async def _flush_loop(self):
while self._running:
await asyncio.sleep(self.flush_interval)
await self._flush()
async def _flush(self):
if not self.buffer:
return
events = list(self.buffer)
self.buffer.clear()
# 批量写入存储
await self._write_to_storage(events)
logger.info(f"已写入 {len(events)} 条使用记录")
async def _write_to_storage(self, events: list[LLMUsageEvent]):
# 写入数据库或数据仓库
pass
分析维度
1. 用户维度分析
class UserAnalytics:
def __init__(self, storage):
self.storage = storage
async def get_user_stats(self, user_id: str, days: int = 30) -> dict:
events = await self.storage.query(
user_id=user_id,
start_date=datetime.now() - timedelta(days=days),
)
return {
"user_id": user_id,
"total_requests": len(events),
"total_tokens": sum(e.total_tokens for e in events),
"total_cost": sum(e._cost for e in events),
"avg_latency": sum(e.request_latency_ms for e in events) / len(events),
"favorite_features": self._top_features(events),
"active_hours": self._active_hours(events),
"error_rate": self._error_rate(events),
}
def _top_features(self, events, top_n=5):
feature_counts = {}
for e in events:
feature_counts[e.feature] = feature_counts.get(e.feature, 0) + 1
return sorted(feature_counts.items(), key=lambda x: x[1], reverse=True)[:top_n]
2. 模型性能分析
class ModelPerformanceAnalytics:
def __init__(self, storage):
self.storage = storage
async def compare_models(self, days: int = 7) -> dict:
events = await self.storage.query(days=days)
models = {}
for event in events:
if event.model not in models:
models[event.model] = {
"requests": 0,
"latencies": [],
"token_rates": [],
"errors": 0,
}
stats = models[event.model]
stats["requests"] += 1
stats["latencies"].append(event.request_latency_ms)
stats["token_rates"].append(event.tokens_per_second)
if event.error_type:
stats["errors"] += 1
result = {}
for model, stats in models.items():
result[model] = {
"requests": stats["requests"],
"avg_latency_ms": sum(stats["latencies"]) / len(stats["latencies"]),
"p95_latency_ms": self._percentile(stats["latencies"], 95),
"avg_tokens_per_sec": sum(stats["token_rates"]) / len(stats["token_rates"]),
"error_rate": stats["errors"] / stats["requests"],
}
return result
def _percentile(self, data, percentile):
sorted_data = sorted(data)
index = int(len(sorted_data) * percentile / 100)
return sorted_data[min(index, len(sorted_data) - 1)]
3. 功能使用分析
class FeatureAnalytics:
def __init__(self, storage):
self.storage = storage
async def get_feature_breakdown(self, days: int = 30) -> dict:
events = await self.storage.query(days=days)
features = {}
for event in events:
feature = event.feature
if feature not in features:
features[feature] = {
"count": 0,
"total_tokens": 0,
"total_latency": 0,
"errors": 0,
"unique_users": set(),
}
stats = features[feature]
stats["count"] += 1
stats["total_tokens"] += event.total_tokens
stats["total_latency"] += event.request_latency_ms
if event.error_type:
stats["errors"] += 1
if event.user_id:
stats["unique_users"].add(event.user_id)
# 转换为可序列化格式
result = {}
for feature, stats in features.items():
result[feature] = {
"requests": stats["count"],
"unique_users": len(stats["unique_users"]),
"avg_tokens_per_request": stats["total_tokens"] / stats["count"],
"avg_latency_ms": stats["total_latency"] / stats["count"],
"error_rate": stats["errors"] / stats["count"],
}
return result
实时监控仪表板
class RealtimeDashboard:
def __init__(self, storage):
self.storage = storage
self.metrics_cache = {}
async def get_dashboard_data(self) -> dict:
now = datetime.now()
# 最近1小时数据
recent_events = await self.storage.query(
start_date=now - timedelta(hours=1)
)
# 最近24小时数据
daily_events = await self.storage.query(
start_date=now - timedelta(hours=24)
)
return {
"realtime": {
"requests_last_hour": len(recent_events),
"active_users": len(set(e.user_id for e in recent_events if e.user_id)),
"avg_latency_ms": self._avg(recent_events, "request_latency_ms"),
"error_rate": self._error_rate(recent_events),
},
"daily": {
"total_requests": len(daily_events),
"total_tokens": sum(e.total_tokens for e in daily_events),
"total_cost": sum(e._cost for e in daily_events),
"top_models": self._top_models(daily_events),
"peak_hours": self._peak_hours(daily_events),
},
"trends": await self._calculate_trends(days=7),
}
def _avg(self, events, field_name):
if not events:
return 0
return sum(getattr(e, field_name) for e in events) / len(events)
def _error_rate(self, events):
if not events:
return 0
errors = sum(1 for e in events if e.error_type)
return errors / len(events)
异常检测
class AnomalyDetector:
def __init__(self, storage, sensitivity: float = 2.0):
self.storage = storage
self.sensitivity = sensitivity
self.baseline = None
async def detect_anomalies(self, metric: str, hours: int = 24) -> list:
if self.baseline is None:
self.baseline = await self._calculate_baseline(metric)
recent = await self._get_recent_values(metric, hours)
anomalies = []
for timestamp, value in recent:
z_score = (value - self.baseline["mean"]) / self.baseline["std"]
if abs(z_score) > self.sensitivity:
anomalies.append({
"timestamp": timestamp,
"value": value,
"z_score": z_score,
"type": "high" if z_score > 0 else "low",
})
return anomalies
async def _calculate_baseline(self, metric):
historical = await self.storage.query(days=30)
values = [getattr(e, metric) for e in historical]
return {
"mean": sum(values) / len(values),
"std": (sum((x - sum(values)/len(values))**2 for x in values) / len(values)) ** 0.5,
}
报表生成
class UsageReportGenerator:
def __init__(self, analytics):
self.analytics = analytics
async def generate_weekly_report(self) -> str:
user_stats = await self.analytics.get_user_stats(days=7)
model_stats = await self.analytics.compare_models(days=7)
feature_stats = await self.analytics.get_feature_breakdown(days=7)
report = f"""
# LLM使用周报
## 总体概览
- 总请求数: {user_stats['total_requests']}
- 总Token数: {user_stats['total_tokens']}
- 平均延迟: {user_stats['avg_latency']:.0f}ms
## 模型性能
"""
for model, stats in model_stats.items():
report += f"- {model}: {stats['requests']}次请求, "
report += f"P95延迟 {stats['p95_latency_ms']:.0f}ms, "
report += f"错误率 {stats['error_rate']:.1%}\n"
return report
总结
系统化的使用分析是LLM应用持续优化的基础。通过多维度的数据收集、实时监控、异常检测和报表生成,团队可以做出基于数据的决策,不断提升服务质量和用户体验。