← 返回首页
🧠

LLM链路追踪

📂 llm ⏱ 2 min 315 words

--- title: "LLM链路追踪" description: "详细介绍大语言模型分布式链路追踪的实现方案,包括OpenTelemetry集成、Span设计、追踪可视化与性能分析。" tags: ["链路追踪", "OpenTelemetry", "分布式追踪", "Span"] category: "llm" icon: "🧠"

LLM链路追踪

为什么需要链路追踪

在复杂的LLM应用中,一个用户请求往往经过多个服务组件:API网关、Prompt处理、模型推理、后处理、RAG检索等。当出现性能问题或错误时,传统的日志分析难以快速定位根因。链路追踪通过为每个请求分配唯一ID,记录完整的调用链路,使问题定位变得高效直观。

链路追踪的核心概念

OpenTelemetry集成

安装与配置

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

# 配置Tracer
provider = TracerProvider()
processor = BatchSpanProcessor(
    OTLPSpanExporter(endpoint="http://collector:4317")
)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("llm-inference")

LLM Span设计

import time
from opentelemetry import trace

class LLMTracer:
    def __init__(self, tracer):
        self.tracer = tracer

    def trace_inference(self, model: str, prompt: str):
        with self.tracer.start_as_current_span("llm.inference") as span:
            span.set_attribute("llm.model", model)
            span.set_attribute("llm.prompt_length", len(prompt))

            with self.tracer.start_as_current_span("llm.tokenization") as tok_span:
                tokens = tokenize(prompt)
                tok_span.set_attribute("llm.token_count", len(tokens))

            with self.tracer.start_as_current_span("llm.generation") as gen_span:
                start_time = time.time()
                response = model.generate(tokens)
                gen_span.set_attribute("llm.output_tokens", len(response))
                gen_span.set_attribute("llm.generation_time_ms",
                    (time.time() - start_time) * 1000)

            with self.tracer.start_as_current_span("llm.postprocessing") as pp_span:
                processed = postprocess(response)
                pp_span.set_attribute("llm.processed_length", len(processed))

            span.set_attribute("llm.total_tokens",
                len(tokens) + len(response))

            return processed

# 使用示例
llm_tracer = LLMTracer(tracer)
result = llm_tracer.trace_inference("gpt-4", "解释量子计算")

高级追踪场景

RAG系统追踪

def trace_rag_query(query: str):
    with tracer.start_as_current_span("rag.query") as root_span:
        root_span.set_attribute("rag.query", query)

        # 检索阶段
        with tracer.start_as_current_span("rag.retrieval") as ret_span:
            docs = vector_store.search(query, top_k=5)
            ret_span.set_attribute("rag.retrieved_docs", len(docs))

            for i, doc in enumerate(docs):
                with tracer.start_as_current_span(f"rag.doc[{i}]") as doc_span:
                    doc_span.set_attribute("rag.doc.id", doc.id)
                    doc_span.set_attribute("rag.doc.score", doc.score)

        # 生成阶段
        with tracer.start_as_current_span("rag.generation") as gen_span:
            context = "\n".join([d.content for d in docs])
            prompt = f"基于以下信息回答问题:\n{context}\n\n问题:{query}"
            response = llm.generate(prompt)
            gen_span.set_attribute("rag.response_length", len(response))

        root_span.set_attribute("rag.total_latency_ms",
            (time.time() - root_span.start_time) * 1000)

        return response

多模型链路追踪

def trace_chain_of_thought(problem: str):
    with tracer.start_as_current_span("cot.pipeline") as root:
        steps = []

        # Step 1: 分解问题
        with tracer.start_as_current_span("cot.decompose") as s1:
            decomposition = small_model.generate(
                f"将以下问题分解为子问题:{problem}"
            )
            steps.append(("decompose", decomposition))
            s1.set_attribute("cot.subproblems", len(decomposition.split("\n")))

        # Step 2: 逐步推理
        for i, subproblem in enumerate(decomposition.split("\n")):
            with tracer.start_as_current_span(f"cot.solve[{i}]") as s2:
                solution = large_model.generate(subproblem)
                steps.append((f"solve_{i}", solution))
                s2.set_attribute("cot.subproblem", subproblem[:100])

        # Step 3: 汇总答案
        with tracer.start_as_current_span("cot.summarize") as s3:
            final = large_model.generate(
                f"综合以下推理结果给出最终答案:\n{json.dumps(steps)}"
            )
            root.setAttribute("cot.final_answer", final[:200])

        return final

追踪数据分析

Jaeger查询示例

from jaeger_client import Config

# 查询特定服务的慢请求
def find_slow_traces(service_name: str, min_duration_ms: int = 5000):
    query = {
        "service": service_name,
        "minDuration": f"{min_duration_ms}ms",
        "limit": 100
    }
    traces = jaeger_client.get_traces(query)
    return traces

性能分析视图

通过追踪数据可以生成以下分析报告:

分析维度 关键指标 优化方向
延迟分布 P50/P95/P99 识别长尾延迟
瓶颈定位 各Span耗时占比 优化最慢环节
调用拓扑 服务间依赖关系 减少不必要的调用
错误传播 错误Span路径 优化错误处理

生产环境最佳实践

  1. 采样策略:生产环境使用动态采样,错误请求100%采样
  2. 上下文传播:确保HTTP/gRPC请求头正确携带Trace ID
  3. Span粒度:平衡追踪详细度和性能开销
  4. 数据保留:设置合理的追踪数据保留周期
  5. 与日志关联:在日志中记录Trace ID,实现追踪与日志的关联

链路追踪为LLM系统的可观测性提供了关键能力,是构建高可靠LLM服务的必备基础设施。