← 返回首页
🧠

LLM API集成实践

📂 llm ⏱ 4 min 733 words

--- title: "LLM API集成实践" description: "详细介绍大语言模型API集成的实践方法,包括REST、GraphQL和WebSocket协议的具体实现" tags: ["LLM API", "REST", "GraphQL", "WebSocket", "API集成"] category: "llm" icon: "🧠"

LLM API集成实践

将大语言模型(LLM)集成到应用程序中,API是主要的接口方式。不同的API协议适用于不同的应用场景:REST API简单通用、GraphQL灵活高效、WebSocket支持实时通信。本文将详细介绍这三种协议在LLM集成中的具体实践。

REST API集成

REST API是最常见的LLM集成方式,具有简单、标准化、易于理解的优点。几乎所有LLM服务提供商都提供REST API接口。

import requests
import json
from typing import Dict, Any, Optional

class LLMRestClient:
    """LLM REST API客户端"""
    
    def __init__(self, api_key: str, base_url: str = "https://api.openai.com/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        })
    
    def chat_completion(self, messages: list, model: str = "gpt-4",
                       temperature: float = 0.7, max_tokens: int = 1000,
                       **kwargs) -> Dict[str, Any]:
        """聊天补全接口"""
        endpoint = f"{self.base_url}/chat/completions"
        
        payload = {
            "model": model,
            "messages": messages,
            "temperature": temperature,
            "max_tokens": max_tokens,
            **kwargs
        }
        
        response = self.session.post(endpoint, json=payload)
        
        if response.status_code == 200:
            return response.json()
        elif response.status_code == 429:
            raise Exception("Rate limit exceeded")
        else:
            raise Exception(f"API error {response.status_code}: {response.text}")
    
    def embedding(self, texts: list, model: str = "text-embedding-ada-002"):
        """文本嵌入接口"""
        endpoint = f"{self.base_url}/embeddings"
        
        payload = {
            "model": model,
            "input": texts
        }
        
        response = self.session.post(endpoint, json=payload)
        return response.json()
    
    def moderation(self, text: str):
        """内容审核接口"""
        endpoint = f"{self.base_url}/moderations"
        
        payload = {"input": text}
        response = self.session.post(endpoint, json=payload)
        return response.json()

# 使用示例
client = LLMRestClient(api_key="your-api-key")

# 简单对话
response = client.chat_completion([
    {"role": "system", "content": "你是一个有帮助的助手。"},
    {"role": "user", "content": "解释REST API的优势"}
])

print(response["choices"][0]["message"]["content"])

REST API的优势在于标准化程度高、工具支持完善、易于缓存和调试。但缺点是每次请求都需要完整的HTTP连接,在高并发场景下可能成为性能瓶颈。

GraphQL API集成

GraphQL API提供了更灵活的数据查询能力,客户端可以精确指定需要的数据字段,减少网络传输和数据处理开销。

import requests
from typing import Dict, Any, List

class LLMGraphQLClient:
    """LLM GraphQL API客户端"""
    
    def __init__(self, api_key: str, endpoint: str):
        self.api_key = api_key
        self.endpoint = endpoint
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }
    
    def execute_query(self, query: str, variables: Dict[str, Any] = None) -> Dict[str, Any]:
        """执行GraphQL查询"""
        payload = {"query": query}
        if variables:
            payload["variables"] = variables
        
        response = requests.post(
            self.endpoint,
            json=payload,
            headers=self.headers
        )
        
        return response.json()
    
    def chat_completion(self, messages: List[Dict], model: str = "gpt-4",
                       **kwargs) -> Dict[str, Any]:
        """通过GraphQL执行聊天补全"""
        query = """
        mutation ChatCompletion($messages: [Message!]!, $model: String!, $options: CompletionOptions) {
            chatCompletion(messages: $messages, model: $model, options: $options) {
                id
                choices {
                    message {
                        role
                        content
                    }
                    finishReason
                }
                usage {
                    promptTokens
                    completionTokens
                    totalTokens
                }
            }
        }
        """
        
        variables = {
            "messages": messages,
            "model": model,
            "options": kwargs
        }
        
        result = self.execute_query(query, variables)
        return result.get("data", {}).get("chatCompletion")
    
    def batch_completion(self, requests_list: List[Dict]) -> List[Dict]:
        """批量GraphQL查询"""
        query = """
        query BatchCompletion($requests: [CompletionRequest!]!) {
            batchCompletion(requests: $requests) {
                id
                result {
                    choices {
                        message {
                            content
                        }
                    }
                }
                error
            }
        }
        """
        
        variables = {"requests": requests_list}
        result = self.execute_query(query, variables)
        return result.get("data", {}).get("batchCompletion")

# 使用示例
client = LLMGraphQLClient(
    api_key="your-api-key",
    endpoint="https://api.example.com/graphql"
)

# 精确查询需要的字段
response = client.chat_completion(
    messages=[{"role": "user", "content": "介绍GraphQL的优势"}],
    model="gpt-4",
    temperature=0.7
)

print(response)

GraphQL的优势在于灵活性和效率。客户端可以避免过度获取或不足获取数据,特别适合复杂的前端应用。但GraphQL的学习曲线较陡,服务端实现也相对复杂。

WebSocket实时通信

WebSocket提供了全双工通信能力,适合需要实时交互的LLM应用场景,如流式对话、实时翻译等。

import asyncio
import websockets
import json
from typing import Callable, Optional

class LLMWebSocketClient:
    """LLM WebSocket客户端"""
    
    def __init__(self, url: str, api_key: str):
        self.url = url
        self.api_key = api_key
        self.websocket = None
        self.callbacks = {}
    
    async def connect(self):
        """建立WebSocket连接"""
        headers = {"Authorization": f"Bearer {self.api_key}"}
        self.websocket = await websockets.connect(
            self.url,
            extra_headers=headers
        )
        
        # 启动消息接收循环
        asyncio.create_task(self._receive_messages())
    
    async def _receive_messages(self):
        """接收消息循环"""
        try:
            async for message in self.websocket:
                data = json.loads(message)
                msg_type = data.get("type")
                
                if msg_type in self.callbacks:
                    await self.callbacks[msg_type](data)
        except websockets.exceptions.ConnectionClosed:
            print("WebSocket connection closed")
    
    async def send_chat_request(self, messages: list, model: str = "gpt-4",
                              stream: bool = True, **kwargs) -> str:
        """发送聊天请求"""
        request_id = f"req_{id(messages)}"
        
        # 设置回调
        full_response = ""
        chunks = []
        
        async def handle_chunk(data):
            nonlocal full_response
            if "choices" in data and data["choices"]:
                delta = data["choices"][0].get("delta", {})
                if "content" in delta:
                    chunk = delta["content"]
                    full_response += chunk
                    chunks.append(chunk)
        
        self.callbacks["chunk"] = handle_chunk
        
        # 发送请求
        payload = {
            "id": request_id,
            "type": "chat_completion",
            "messages": messages,
            "model": model,
            "stream": stream,
            **kwargs
        }
        
        await self.websocket.send(json.dumps(payload))
        
        # 等待完成
        done_event = asyncio.Event()
        
        async def handle_done(data):
            done_event.set()
        
        self.callbacks["done"] = handle_done
        
        # 设置超时
        try:
            await asyncio.wait_for(done_event.wait(), timeout=60)
        except asyncio.TimeoutError:
            raise Exception("Request timed out")
        
        return full_response
    
    async def disconnect(self):
        """断开连接"""
        if self.websocket:
            await self.websocket.close()

# 使用示例
async def websocket_example():
    client = LLMWebSocketClient(
        url="wss://api.example.com/ws",
        api_key="your-api-key"
    )
    
    await client.connect()
    
    # 流式对话
    response = await client.send_chat_request(
        messages=[{"role": "user", "content": "用WebSocket实现LLM集成"}],
        stream=True
    )
    
    print("响应:", response)
    
    await client.disconnect()

# asyncio.run(websocket_example())

WebSocket的关键优势在于实时性和效率。对于流式输出,WebSocket避免了HTTP轮询的开销;对于长对话,WebSocket保持了连接状态,减少了重复认证的开销。

错误处理与重试机制

健壮的API集成需要完善的错误处理和重试机制。

import time
import random
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
    """指数退避重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            while retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if retries == max_retries - 1:
                        raise e
                    
                    # 指数退避
                    delay = min(base_delay * (2 ** retries), max_delay)
                    # 添加随机抖动
                    jitter = random.uniform(0, delay * 0.1)
                    sleep_time = delay + jitter
                    
                    print(f"Retry {retries + 1}/{max_retries} after {sleep_time:.2f}s")
                    time.sleep(sleep_time)
                    retries += 1
        return wrapper
    return decorator

class RobustLLMClient:
    """健壮的LLM客户端"""
    
    def __init__(self, api_key: str):
        self.client = LLMRestClient(api_key)
    
    @retry_with_backoff(max_retries=3)
    def generate_with_retry(self, prompt: str, **kwargs):
        """带重试的生成"""
        return self.client.chat_completion(
            messages=[{"role": "user", "content": prompt}],
            **kwargs
        )
    
    def generate_with_fallback(self, prompt: str, primary_model: str = "gpt-4",
                              fallback_model: str = "gpt-3.5-turbo"):
        """带降级的生成"""
        try:
            return self.client.chat_completion(
                messages=[{"role": "user", "content": prompt}],
                model=primary_model
            )
        except Exception as e:
            print(f"Primary model failed: {e}, using fallback")
            return self.client.chat_completion(
                messages=[{"role": "user", "content": prompt}],
                model=fallback_model
            )

在实际生产环境中,还需要考虑连接池管理、请求合并、结果缓存等优化措施。选择合适的集成模式和实现健壮的错误处理,是构建可靠LLM应用的关键。