LLM API集成实践
--- title: "LLM API集成实践" description: "详细介绍大语言模型API集成的实践方法,包括REST、GraphQL和WebSocket协议的具体实现" tags: ["LLM API", "REST", "GraphQL", "WebSocket", "API集成"] category: "llm" icon: "🧠"
LLM API集成实践
将大语言模型(LLM)集成到应用程序中,API是主要的接口方式。不同的API协议适用于不同的应用场景:REST API简单通用、GraphQL灵活高效、WebSocket支持实时通信。本文将详细介绍这三种协议在LLM集成中的具体实践。
REST API集成
REST API是最常见的LLM集成方式,具有简单、标准化、易于理解的优点。几乎所有LLM服务提供商都提供REST API接口。
import requests
import json
from typing import Dict, Any, Optional
class LLMRestClient:
"""LLM REST API客户端"""
def __init__(self, api_key: str, base_url: str = "https://api.openai.com/v1"):
self.api_key = api_key
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
})
def chat_completion(self, messages: list, model: str = "gpt-4",
temperature: float = 0.7, max_tokens: int = 1000,
**kwargs) -> Dict[str, Any]:
"""聊天补全接口"""
endpoint = f"{self.base_url}/chat/completions"
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": max_tokens,
**kwargs
}
response = self.session.post(endpoint, json=payload)
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
raise Exception("Rate limit exceeded")
else:
raise Exception(f"API error {response.status_code}: {response.text}")
def embedding(self, texts: list, model: str = "text-embedding-ada-002"):
"""文本嵌入接口"""
endpoint = f"{self.base_url}/embeddings"
payload = {
"model": model,
"input": texts
}
response = self.session.post(endpoint, json=payload)
return response.json()
def moderation(self, text: str):
"""内容审核接口"""
endpoint = f"{self.base_url}/moderations"
payload = {"input": text}
response = self.session.post(endpoint, json=payload)
return response.json()
# 使用示例
client = LLMRestClient(api_key="your-api-key")
# 简单对话
response = client.chat_completion([
{"role": "system", "content": "你是一个有帮助的助手。"},
{"role": "user", "content": "解释REST API的优势"}
])
print(response["choices"][0]["message"]["content"])
REST API的优势在于标准化程度高、工具支持完善、易于缓存和调试。但缺点是每次请求都需要完整的HTTP连接,在高并发场景下可能成为性能瓶颈。
GraphQL API集成
GraphQL API提供了更灵活的数据查询能力,客户端可以精确指定需要的数据字段,减少网络传输和数据处理开销。
import requests
from typing import Dict, Any, List
class LLMGraphQLClient:
"""LLM GraphQL API客户端"""
def __init__(self, api_key: str, endpoint: str):
self.api_key = api_key
self.endpoint = endpoint
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
def execute_query(self, query: str, variables: Dict[str, Any] = None) -> Dict[str, Any]:
"""执行GraphQL查询"""
payload = {"query": query}
if variables:
payload["variables"] = variables
response = requests.post(
self.endpoint,
json=payload,
headers=self.headers
)
return response.json()
def chat_completion(self, messages: List[Dict], model: str = "gpt-4",
**kwargs) -> Dict[str, Any]:
"""通过GraphQL执行聊天补全"""
query = """
mutation ChatCompletion($messages: [Message!]!, $model: String!, $options: CompletionOptions) {
chatCompletion(messages: $messages, model: $model, options: $options) {
id
choices {
message {
role
content
}
finishReason
}
usage {
promptTokens
completionTokens
totalTokens
}
}
}
"""
variables = {
"messages": messages,
"model": model,
"options": kwargs
}
result = self.execute_query(query, variables)
return result.get("data", {}).get("chatCompletion")
def batch_completion(self, requests_list: List[Dict]) -> List[Dict]:
"""批量GraphQL查询"""
query = """
query BatchCompletion($requests: [CompletionRequest!]!) {
batchCompletion(requests: $requests) {
id
result {
choices {
message {
content
}
}
}
error
}
}
"""
variables = {"requests": requests_list}
result = self.execute_query(query, variables)
return result.get("data", {}).get("batchCompletion")
# 使用示例
client = LLMGraphQLClient(
api_key="your-api-key",
endpoint="https://api.example.com/graphql"
)
# 精确查询需要的字段
response = client.chat_completion(
messages=[{"role": "user", "content": "介绍GraphQL的优势"}],
model="gpt-4",
temperature=0.7
)
print(response)
GraphQL的优势在于灵活性和效率。客户端可以避免过度获取或不足获取数据,特别适合复杂的前端应用。但GraphQL的学习曲线较陡,服务端实现也相对复杂。
WebSocket实时通信
WebSocket提供了全双工通信能力,适合需要实时交互的LLM应用场景,如流式对话、实时翻译等。
import asyncio
import websockets
import json
from typing import Callable, Optional
class LLMWebSocketClient:
"""LLM WebSocket客户端"""
def __init__(self, url: str, api_key: str):
self.url = url
self.api_key = api_key
self.websocket = None
self.callbacks = {}
async def connect(self):
"""建立WebSocket连接"""
headers = {"Authorization": f"Bearer {self.api_key}"}
self.websocket = await websockets.connect(
self.url,
extra_headers=headers
)
# 启动消息接收循环
asyncio.create_task(self._receive_messages())
async def _receive_messages(self):
"""接收消息循环"""
try:
async for message in self.websocket:
data = json.loads(message)
msg_type = data.get("type")
if msg_type in self.callbacks:
await self.callbacks[msg_type](data)
except websockets.exceptions.ConnectionClosed:
print("WebSocket connection closed")
async def send_chat_request(self, messages: list, model: str = "gpt-4",
stream: bool = True, **kwargs) -> str:
"""发送聊天请求"""
request_id = f"req_{id(messages)}"
# 设置回调
full_response = ""
chunks = []
async def handle_chunk(data):
nonlocal full_response
if "choices" in data and data["choices"]:
delta = data["choices"][0].get("delta", {})
if "content" in delta:
chunk = delta["content"]
full_response += chunk
chunks.append(chunk)
self.callbacks["chunk"] = handle_chunk
# 发送请求
payload = {
"id": request_id,
"type": "chat_completion",
"messages": messages,
"model": model,
"stream": stream,
**kwargs
}
await self.websocket.send(json.dumps(payload))
# 等待完成
done_event = asyncio.Event()
async def handle_done(data):
done_event.set()
self.callbacks["done"] = handle_done
# 设置超时
try:
await asyncio.wait_for(done_event.wait(), timeout=60)
except asyncio.TimeoutError:
raise Exception("Request timed out")
return full_response
async def disconnect(self):
"""断开连接"""
if self.websocket:
await self.websocket.close()
# 使用示例
async def websocket_example():
client = LLMWebSocketClient(
url="wss://api.example.com/ws",
api_key="your-api-key"
)
await client.connect()
# 流式对话
response = await client.send_chat_request(
messages=[{"role": "user", "content": "用WebSocket实现LLM集成"}],
stream=True
)
print("响应:", response)
await client.disconnect()
# asyncio.run(websocket_example())
WebSocket的关键优势在于实时性和效率。对于流式输出,WebSocket避免了HTTP轮询的开销;对于长对话,WebSocket保持了连接状态,减少了重复认证的开销。
错误处理与重试机制
健壮的API集成需要完善的错误处理和重试机制。
import time
import random
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
"""指数退避重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
if retries == max_retries - 1:
raise e
# 指数退避
delay = min(base_delay * (2 ** retries), max_delay)
# 添加随机抖动
jitter = random.uniform(0, delay * 0.1)
sleep_time = delay + jitter
print(f"Retry {retries + 1}/{max_retries} after {sleep_time:.2f}s")
time.sleep(sleep_time)
retries += 1
return wrapper
return decorator
class RobustLLMClient:
"""健壮的LLM客户端"""
def __init__(self, api_key: str):
self.client = LLMRestClient(api_key)
@retry_with_backoff(max_retries=3)
def generate_with_retry(self, prompt: str, **kwargs):
"""带重试的生成"""
return self.client.chat_completion(
messages=[{"role": "user", "content": prompt}],
**kwargs
)
def generate_with_fallback(self, prompt: str, primary_model: str = "gpt-4",
fallback_model: str = "gpt-3.5-turbo"):
"""带降级的生成"""
try:
return self.client.chat_completion(
messages=[{"role": "user", "content": prompt}],
model=primary_model
)
except Exception as e:
print(f"Primary model failed: {e}, using fallback")
return self.client.chat_completion(
messages=[{"role": "user", "content": prompt}],
model=fallback_model
)
在实际生产环境中,还需要考虑连接池管理、请求合并、结果缓存等优化措施。选择合适的集成模式和实现健壮的错误处理,是构建可靠LLM应用的关键。