← 返回首页
🧠

流式响应

📂 llm ⏱ 3 min 532 words

--- title: "流式响应" description: "LLM流式响应技术,包括SSE和分块传输实现" tags: ["流式响应", "SSE", "分块传输", "实时输出", "用户体验"] category: "llm" icon: "🧠"

流式响应

流式响应是LLM应用提升用户体验的关键技术。通过逐Token输出而不是等待完整响应,用户可以实时看到生成过程,显著减少感知延迟。SSE(Server-Sent Events)是实现LLM流式输出的主流方案。

SSE基础实现

服务端实现

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def generate_tokens(prompt: str):
    tokens = ["你好", ",我是", "AI", "助手", ",", "很高兴", "为您", "服务", "。"]
    for token in tokens:
        chunk = {
            "choices": [{"delta": {"content": token}, "finish_reason": None}],
            "usage": None
        }
        yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
        await asyncio.sleep(0.1)

    finish_chunk = {
        "choices": [{"delta": {}, "finish_reason": "stop"}],
        "usage": {"prompt_tokens": 10, "completion_tokens": 9}
    }
    yield f"data: {json.dumps(finish_chunk)}\n\n"
    yield "data: [DONE]\n\n"

@app.post("/v1/chat/completions")
async def chat_completions(request: dict):
    return StreamingResponse(
        generate_tokens(request.get("messages", [{}])[-1].get("content", "")),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"
        }
    )

客户端实现

import httpx
import asyncio
import json

async def stream_chat(messages: list, model: str = "gpt-4"):
    async with httpx.AsyncClient() as client:
        async with client.stream(
            "POST",
            "http://localhost:8000/v1/chat/completions",
            json={"model": model, "messages": messages, "stream": True},
            timeout=60.0
        ) as response:
            full_response = ""
            async for line in response.aiter_lines():
                if line.startswith("data: "):
                    data = line[6:]
                    if data == "[DONE]":
                        break
                    chunk = json.loads(data)
                    content = chunk["choices"][0]["delta"].get("content", "")
                    if content:
                        print(content, end="", flush=True)
                        full_response += content
            print()
            return full_response

messages = [{"role": "user", "content": "用Python写一个简单的HTTP服务器"}]
asyncio.run(stream_chat(messages))

分块传输实现

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def chunked_generator(data: dict):
    chunk_size = 1024
    content = json.dumps(data, ensure_ascii=False)

    for i in range(0, len(content), chunk_size):
        chunk = content[i:i + chunk_size]
        yield f"{len(chunk):x}\r\n{chunk}\r\n"
        await asyncio.sleep(0.01)
    yield "0\r\n\r\n"

@app.post("/api/generate")
async def generate(request: Request):
    body = await request.json()

    async def generate_response():
        result = {"response": "这是一个分块传输的响应"}
        async for chunk in chunked_generator(result):
            yield chunk

    return StreamingResponse(
        generate_response(),
        media_type="application/json",
        headers={"Transfer-Encoding": "chunked"}
    )

WebSocket流式

from fastapi import FastAPI, WebSocket
import asyncio
import json

app = FastAPI()

@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            message = json.loads(data)

            for i in range(10):
                chunk = {"token": f"token_{i}", "index": i}
                await websocket.send_text(json.dumps(chunk))
                await asyncio.sleep(0.1)

            await websocket.send_text(json.dumps({"done": True}))
    except Exception:
        await websocket.close()

# 客户端示例
async def ws_client():
    import websockets
    async with websockets.connect("ws://localhost:8000/ws/chat") as ws:
        await ws.send(json.dumps({"message": "你好"}))
        while True:
            response = await ws.recv()
            data = json.loads(response)
            if data.get("done"):
                break
            print(data["token"], end="", flush=True)

React前端集成

import { useState, useCallback } from 'react';

function useChatStream() {
  const [messages, setMessages] = useState([]);
  const [isStreaming, setIsStreaming] = useState(false);

  const sendMessage = useCallback(async (content) => {
    const userMessage = { role: 'user', content };
    setMessages(prev => [...prev, userMessage]);
    setIsStreaming(true);

    const response = await fetch('/v1/chat/completions', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        messages: [...messages, userMessage],
        stream: true
      })
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let assistantMessage = '';

    while (true) {
      const { done, value } = await reader.read();
      if (done) break;

      const text = decoder.decode(value);
      const lines = text.split('\n');

      for (const line of lines) {
        if (line.startsWith('data: ') && line !== 'data: [DONE]') {
          const data = JSON.parse(line.slice(6));
          const content = data.choices[0]?.delta?.content;
          if (content) {
            assistantMessage += content;
            setMessages(prev => {
              const updated = [...prev];
              const lastMsg = updated[updated.length - 1];
              if (lastMsg?.role === 'assistant') {
                lastMsg.content = assistantMessage;
              } else {
                updated.push({ role: 'assistant', content: assistantMessage });
              }
              return updated;
            });
          }
        }
      }
    }
    setIsStreaming(false);
  }, [messages]);

  return { messages, sendMessage, isStreaming };
}

性能优化

  1. 缓冲控制:禁用Nginx缓冲,设置X-Accel-Buffering: no
  2. 超时设置:设置合理的流式超时时间
  3. 错误处理:处理连接中断和超时情况
  4. 资源释放:及时关闭连接释放资源

流式响应是现代LLM应用的标配,合理的实现能显著提升用户体验和系统性能。