← 返回首页
🧠

New Relic与LLM

📂 llm ⏱ 3 min 572 words

--- title: "New Relic与LLM" description: "介绍如何使用New Relic平台监控LLM应用,包括APM、日志管理和AI监控功能。" tags: ["New Relic", "LLM", "APM"] category: "llm" icon: "🧠"

New Relic与LLM

New Relic简介

New Relic是一个全栈可观测性平台,提供应用性能监控(APM)、基础设施监控、日志管理等功能。近年来,New Relic推出了专门针对AI/LLM应用的监控功能。

New Relic的核心优势:

安装与配置

Agent安装

# Linux
curl -s https://raw.githubusercontent.com/newrelic/newrelic-agent/master/scripts/install.sh | sudo bash

# 或使用包管理器
# Ubuntu/Debian
sudo apt-get install newrelic-php5

# Docker
docker run -d \
  --name newrelic-infra \
  -e NEW_RELIC_LICENSE_KEY=<YOUR_LICENSE_KEY> \
  newrelic/infrastructure:latest

Python Agent配置

# newrelic.ini 配置示例
[newrelic]
license_key = <YOUR_LICENSE_KEY>
app_name = LLM Service
distributed_tracing.enabled = true
transaction_tracer.enabled = true
error_collector.enabled = true

初始化Agent

import newrelic.agent

# 加载配置
newrelic.agent.initialize('newrelic.ini')

# 装饰器方式追踪函数
@newrelic.agent.function_trace()
def process_query(query):
    # 处理查询
    return result

AI监控功能

LLM监控配置

New Relic提供了专门的LLM监控功能:

from newrelic.agent import record_llm_event, record_llm_feedback

# 记录LLM请求事件
def track_llm_request(model: str, prompt: str, response: str, 
                      token_usage: dict, latency: float):
    record_llm_event(
        model=model,
        input=prompt[:1000],  # 截断过长输入
        output=response[:1000],
        token_count=token_usage.get("total_tokens", 0),
        duration=latency,
        status="success",
        metadata={
            "temperature": 0.7,
            "max_tokens": 2000
        }
    )

# 记录用户反馈
def track_feedback(request_id: str, rating: int, feedback: str):
    record_llm_feedback(
        request_id=request_id,
        rating=rating,
        feedback=feedback
    )

自动追踪LLM调用

import newrelic.agent
from functools import wraps

def auto_trace_llm(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        with newrelic.agent.FunctionTrace(
            name=f"LLM/{kwargs.get('model', 'unknown')}",
            group="LLM",
            label=kwargs.get("model")
        ):
            start_time = time.time()
            
            try:
                result = func(*args, **kwargs)
                latency = time.time() - start_time
                
                # 记录到New Relic
                newrelic.agent.record_custom_metric(
                    "Custom/LLM/Request/Duration",
                    latency * 1000
                )
                
                newrelic.agent.record_custom_event(
                    "LLMRequest",
                    {
                        "model": kwargs.get("model"),
                        "latency_ms": latency * 1000,
                        "tokens_used": result.usage.total_tokens,
                        "status": "success"
                    }
                )
                
                return result
            except Exception as e:
                newrelic.agent.record_exception()
                raise
    
    return wrapper

自定义仪表盘

创建LLM监控仪表盘

import newrelic.agent

class LLMDashboard:
    def __init__(self):
        self.metrics = {}
    
    def record_metric(self, name: str, value: float, tags: dict = None):
        newrelic.agent.record_custom_metric(
            f"Custom/LLM/{name}",
            value,
            tags=tags or {}
        )
    
    def track_request(self, model: str, latency: float, 
                      tokens: int, success: bool):
        status = "success" if success else "error"
        
        self.record_metric("Request/Count", 1, {"model": model, "status": status})
        self.record_metric("Request/Duration", latency * 1000, {"model": model})
        self.record_metric("Tokens/Used", tokens, {"model": model})
        
        # 计算成本(示例)
        cost = self.estimate_cost(model, tokens)
        self.record_metric("Cost/Estimated", cost, {"model": model})
    
    def estimate_cost(self, model: str, tokens: int) -> float:
        pricing = {
            "gpt-4": 0.00003,
            "gpt-3.5-turbo": 0.000002
        }
        return tokens * pricing.get(model, 0.00001)

查询自定义指标

-- 请求速率
SELECT rate(count(Custom/LLM/Request/Count), 1 minute) 
FROM Metric 
WHERE service.name = 'llm-service' 
FACET model

-- 平均延迟
SELECT average(Custom/LLM/Request/Duration) 
FROM Metric 
WHERE service.name = 'llm-service' 
FACET model TIMESERIES

-- 成本趋势
SELECT sum(Custom/LLM/Cost/Estimated) 
FROM Metric 
WHERE service.name = 'llm-service' 
FACET model TIMESERIES

日志管理

结构化日志配置

import json
import logging
import newrelic.agent

class NewRelicLogger:
    def __init__(self, service_name: str):
        self.logger = logging.getLogger(service_name)
        self.setup_handler()
    
    def setup_handler(self):
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(message)s')
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
    
    def log_llm_event(self, event_type: str, data: dict):
        log_entry = {
            "eventType": event_type,
            "timestamp": datetime.utcnow().isoformat(),
            **data
        }
        
        # 发送到New Relic日志
        self.logger.info(json.dumps(log_entry))
        
        # 同时记录为自定义事件
        newrelic.agent.record_custom_event(
            event_type,
            data
        )

NRQL查询日志

-- 查看LLM请求日志
SELECT * 
FROM Log 
WHERE service.name = 'llm-service' 
AND eventType = 'LLMRequest'

-- 错误日志统计
SELECT count(*) 
FROM Log 
WHERE service.name = 'llm-service' 
AND level = 'ERROR' 
FACET message

告警配置

创建NRQL告警

-- 错误率告警
SELECT percentage(count(*), WHERE error IS true) 
FROM Transaction 
WHERE service.name = 'llm-service' 
AND transactionType = 'WebTransaction'

-- 延迟告警
SELECT average(duration) 
FROM Transaction 
WHERE service.name = 'llm-service' 
AND name LIKE 'LLM/%'

-- 成本告警
SELECT sum(Custom/LLM/Cost/Estimated) 
FROM Metric 
WHERE service.name = 'llm-service' 
FACET model

通过API配置告警

import requests

def create_alert_policy(name: str, conditions: list):
    headers = {
        "Api-Key": "<YOUR_API_KEY>",
        "Content-Type": "application/json"
    }
    
    # 创建告警策略
    policy_response = requests.post(
        "https://api.newrelic.com/v2/alerts_policies",
        headers=headers,
        json={
            "policy": {
                "name": name,
                "incident_preference": "PER_CONDITION"
            }
        }
    )
    
    policy_id = policy_response.json()["policy"]["id"]
    
    # 添加条件
    for condition in conditions:
        requests.post(
            f"https://api.newrelic.com/v2/alerts_conditions",
            headers=headers,
            json={
                "condition": {
                    "type": "NRQL",
                    "name": condition["name"],
                    "enabled": True,
                    "nrql": {
                        "query": condition["query"]
                    },
                    "terms": [
                        {
                            "duration": "5",
                            "operator": "above",
                            "priority": "critical",
                            "threshold": condition["threshold"],
                            "time_function": "all"
                        }
                    ]
                }
            }
        )

服务地图

自动发现服务依赖

New Relic会自动发现和绘制服务依赖图:

# 服务会自动出现在New Relic服务地图中
# 通过分布式追踪数据关联

@newrelic.agent.function_trace()
def llm_pipeline():
    # 步骤1:预处理
    with newrelic.agent.FunctionTrace("LLM/Preprocess"):
        preprocessed = preprocess(query)
    
    # 步骤2:检索
    with newrelic.agent.FunctionTrace("LLM/Retrieval"):
        documents = retrieve(preprocessed)
    
    # 步骤3:生成
    with newrelic.agent.FunctionTrace("LLM/Generate"):
        response = generate(preprocessed, documents)
    
    return response

最佳实践

  1. 利用AI监控:使用New Relic专门的LLM监控功能
  2. 分布式追踪:启用分布式追踪理解请求流转
  3. 自定义事件:记录关键业务事件用于分析
  4. 告警优化:根据业务需求设置合理的告警阈值
  5. 成本优化:监控New Relic数据使用量,避免超支

通过New Relic,你可以获得LLM应用的全面可观测性,实现智能化的运维管理。