API组合模式
API组合模式
API组合模式概述
API组合(API Composition)是一种在微服务架构中聚合多个服务数据的模式。在微服务架构下,一个完整的业务实体可能分散在多个服务中。API组合通过调用多个后端服务并组合它们的响应,为客户端提供统一的数据视图。
API组合的核心价值在于:减少客户端的网络请求次数、在服务端进行数据聚合(客户端无需了解微服务的划分)、提供定制化的数据格式、在聚合层进行数据过滤和转换。
from fastapi import FastAPI
from typing import Optional
import asyncio
app = FastAPI()
class APIComposer:
"""API组合器 - 聚合多个服务的数据"""
def __init__(self):
self.services = {}
def register_service(self, name: str, client):
self.services[name] = client
async def compose(self, plan: list[dict]) -> dict:
"""根据组合计划聚合数据"""
result = {}
for step in plan:
service = self.services[step["service"]]
method = getattr(service, step["method"])
# 准备参数
params = self.resolve_params(step.get("params", {}), result)
# 并行执行或顺序执行
if step.get("parallel"):
tasks = [method(**p) for p in params]
data = await asyncio.gather(*tasks)
else:
data = await method(**params)
# 映射结果
if step.get("map_to"):
result[step["map_to"]] = data
else:
result[step["service"]] = data
return result
def resolve_params(self, params: dict, context: dict) -> dict:
"""解析参数,支持从上下文中引用"""
resolved = {}
for key, value in params.items():
if isinstance(value, str) and value.startswith("$"):
# 引用上下文中的值
ref = value[1:].split(".")
resolved[key] = context
for r in ref:
resolved[key] = resolved[key][r]
else:
resolved[key] = value
return resolved
组合计划设计
组合计划(Composition Plan)定义了API组合的执行流程。一个好的组合计划应该清晰地描述:需要调用哪些服务、调用顺序是什么、参数如何传递、结果如何映射。
组合计划可以通过配置文件定义,也可以通过代码定义。配置方式更灵活,适合动态变化的场景;代码方式更直观,适合复杂的组合逻辑。
# 组合计划示例
order_detail_plan = [
{
"service": "order",
"method": "get_order",
"params": {"order_id": "$order_id"},
"map_to": "order"
},
{
"service": "customer",
"method": "get_customer",
"params": {"customer_id": "$order.customer_id"},
"map_to": "customer"
},
{
"service": "product",
"method": "batch_get_products",
"params": {"product_ids": "$order.product_ids"},
"parallel": True,
"map_to": "products"
},
{
"service": "review",
"method": "get_reviews",
"params": {"product_ids": "$order.product_ids"},
"map_to": "reviews"
}
]
@app.get("/api/orders/{order_id}/detail")
async def get_order_detail(order_id: str):
"""获取订单详情 - 使用API组合"""
composer = APIComposer()
composer.register_service("order", order_service)
composer.register_service("customer", customer_service)
composer.register_service("product", product_service)
composer.register_service("review", review_service)
data = await composer.compose(order_detail_plan)
# 组装最终响应
return {
"order": data["order"],
"customer": data["customer"],
"products": data["products"],
"reviews": data["reviews"]
}
错误处理与优化
API组合需要处理多个服务调用的错误。常用策略包括:部分失败处理(某些服务失败时返回部分数据)、降级策略(使用缓存数据或默认值)、超时控制(为每个服务调用设置合理的超时)。
性能优化方面,应该尽量并行调用无依赖关系的服务、使用批量API减少调用次数、利用缓存减少重复请求、在组合层进行数据过滤减少传输量。
class ResilientAPIComposer:
"""具有容错能力的API组合器"""
def __init__(self):
self.services = {}
self.cache = {}
async def compose_with_fallback(self, plan: list[dict]) -> dict:
"""带降级的API组合"""
result = {}
for step in plan:
try:
data = await self.call_service(step, result)
result[step.get("map_to", step["service"])] = data
except ServiceTimeoutError:
# 超时降级 - 使用缓存
cached = self.cache.get(f"{step['service']}:{step['method']}")
if cached:
result[step.get("map_to", step["service"])] = cached
else:
result[step.get("map_to", step["service"])] = None
except ServiceError as e:
# 服务错误 - 返回部分数据
result[step.get("map_to", step["service"])] = {
"error": str(e),
"partial": True
}
return result