流式响应
--- title: "流式响应" description: "LLM流式响应技术,包括SSE和分块传输实现" tags: ["流式响应", "SSE", "分块传输", "实时输出", "用户体验"] category: "llm" icon: "🧠"
流式响应
流式响应是LLM应用提升用户体验的关键技术。通过逐Token输出而不是等待完整响应,用户可以实时看到生成过程,显著减少感知延迟。SSE(Server-Sent Events)是实现LLM流式输出的主流方案。
SSE基础实现
服务端实现
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def generate_tokens(prompt: str):
tokens = ["你好", ",我是", "AI", "助手", ",", "很高兴", "为您", "服务", "。"]
for token in tokens:
chunk = {
"choices": [{"delta": {"content": token}, "finish_reason": None}],
"usage": None
}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
await asyncio.sleep(0.1)
finish_chunk = {
"choices": [{"delta": {}, "finish_reason": "stop"}],
"usage": {"prompt_tokens": 10, "completion_tokens": 9}
}
yield f"data: {json.dumps(finish_chunk)}\n\n"
yield "data: [DONE]\n\n"
@app.post("/v1/chat/completions")
async def chat_completions(request: dict):
return StreamingResponse(
generate_tokens(request.get("messages", [{}])[-1].get("content", "")),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
客户端实现
import httpx
import asyncio
import json
async def stream_chat(messages: list, model: str = "gpt-4"):
async with httpx.AsyncClient() as client:
async with client.stream(
"POST",
"http://localhost:8000/v1/chat/completions",
json={"model": model, "messages": messages, "stream": True},
timeout=60.0
) as response:
full_response = ""
async for line in response.aiter_lines():
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
chunk = json.loads(data)
content = chunk["choices"][0]["delta"].get("content", "")
if content:
print(content, end="", flush=True)
full_response += content
print()
return full_response
messages = [{"role": "user", "content": "用Python写一个简单的HTTP服务器"}]
asyncio.run(stream_chat(messages))
分块传输实现
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def chunked_generator(data: dict):
chunk_size = 1024
content = json.dumps(data, ensure_ascii=False)
for i in range(0, len(content), chunk_size):
chunk = content[i:i + chunk_size]
yield f"{len(chunk):x}\r\n{chunk}\r\n"
await asyncio.sleep(0.01)
yield "0\r\n\r\n"
@app.post("/api/generate")
async def generate(request: Request):
body = await request.json()
async def generate_response():
result = {"response": "这是一个分块传输的响应"}
async for chunk in chunked_generator(result):
yield chunk
return StreamingResponse(
generate_response(),
media_type="application/json",
headers={"Transfer-Encoding": "chunked"}
)
WebSocket流式
from fastapi import FastAPI, WebSocket
import asyncio
import json
app = FastAPI()
@app.websocket("/ws/chat")
async def websocket_chat(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
message = json.loads(data)
for i in range(10):
chunk = {"token": f"token_{i}", "index": i}
await websocket.send_text(json.dumps(chunk))
await asyncio.sleep(0.1)
await websocket.send_text(json.dumps({"done": True}))
except Exception:
await websocket.close()
# 客户端示例
async def ws_client():
import websockets
async with websockets.connect("ws://localhost:8000/ws/chat") as ws:
await ws.send(json.dumps({"message": "你好"}))
while True:
response = await ws.recv()
data = json.loads(response)
if data.get("done"):
break
print(data["token"], end="", flush=True)
React前端集成
import { useState, useCallback } from 'react';
function useChatStream() {
const [messages, setMessages] = useState([]);
const [isStreaming, setIsStreaming] = useState(false);
const sendMessage = useCallback(async (content) => {
const userMessage = { role: 'user', content };
setMessages(prev => [...prev, userMessage]);
setIsStreaming(true);
const response = await fetch('/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: [...messages, userMessage],
stream: true
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let assistantMessage = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.startsWith('data: ') && line !== 'data: [DONE]') {
const data = JSON.parse(line.slice(6));
const content = data.choices[0]?.delta?.content;
if (content) {
assistantMessage += content;
setMessages(prev => {
const updated = [...prev];
const lastMsg = updated[updated.length - 1];
if (lastMsg?.role === 'assistant') {
lastMsg.content = assistantMessage;
} else {
updated.push({ role: 'assistant', content: assistantMessage });
}
return updated;
});
}
}
}
}
setIsStreaming(false);
}, [messages]);
return { messages, sendMessage, isStreaming };
}
性能优化
- 缓冲控制:禁用Nginx缓冲,设置
X-Accel-Buffering: no - 超时设置:设置合理的流式超时时间
- 错误处理:处理连接中断和超时情况
- 资源释放:及时关闭连接释放资源
流式响应是现代LLM应用的标配,合理的实现能显著提升用户体验和系统性能。