← 返回首页
🔗

API组合模式

📂 architecture ⏱ 2 min 297 words

API组合模式

API组合模式概述

API组合(API Composition)是一种在微服务架构中聚合多个服务数据的模式。在微服务架构下,一个完整的业务实体可能分散在多个服务中。API组合通过调用多个后端服务并组合它们的响应,为客户端提供统一的数据视图。

API组合的核心价值在于:减少客户端的网络请求次数、在服务端进行数据聚合(客户端无需了解微服务的划分)、提供定制化的数据格式、在聚合层进行数据过滤和转换。

from fastapi import FastAPI
from typing import Optional
import asyncio

app = FastAPI()

class APIComposer:
    """API组合器 - 聚合多个服务的数据"""
    def __init__(self):
        self.services = {}
    
    def register_service(self, name: str, client):
        self.services[name] = client
    
    async def compose(self, plan: list[dict]) -> dict:
        """根据组合计划聚合数据"""
        result = {}
        
        for step in plan:
            service = self.services[step["service"]]
            method = getattr(service, step["method"])
            
            # 准备参数
            params = self.resolve_params(step.get("params", {}), result)
            
            # 并行执行或顺序执行
            if step.get("parallel"):
                tasks = [method(**p) for p in params]
                data = await asyncio.gather(*tasks)
            else:
                data = await method(**params)
            
            # 映射结果
            if step.get("map_to"):
                result[step["map_to"]] = data
            else:
                result[step["service"]] = data
        
        return result
    
    def resolve_params(self, params: dict, context: dict) -> dict:
        """解析参数,支持从上下文中引用"""
        resolved = {}
        for key, value in params.items():
            if isinstance(value, str) and value.startswith("$"):
                # 引用上下文中的值
                ref = value[1:].split(".")
                resolved[key] = context
                for r in ref:
                    resolved[key] = resolved[key][r]
            else:
                resolved[key] = value
        return resolved

组合计划设计

组合计划(Composition Plan)定义了API组合的执行流程。一个好的组合计划应该清晰地描述:需要调用哪些服务、调用顺序是什么、参数如何传递、结果如何映射。

组合计划可以通过配置文件定义,也可以通过代码定义。配置方式更灵活,适合动态变化的场景;代码方式更直观,适合复杂的组合逻辑。

# 组合计划示例
order_detail_plan = [
    {
        "service": "order",
        "method": "get_order",
        "params": {"order_id": "$order_id"},
        "map_to": "order"
    },
    {
        "service": "customer",
        "method": "get_customer",
        "params": {"customer_id": "$order.customer_id"},
        "map_to": "customer"
    },
    {
        "service": "product",
        "method": "batch_get_products",
        "params": {"product_ids": "$order.product_ids"},
        "parallel": True,
        "map_to": "products"
    },
    {
        "service": "review",
        "method": "get_reviews",
        "params": {"product_ids": "$order.product_ids"},
        "map_to": "reviews"
    }
]

@app.get("/api/orders/{order_id}/detail")
async def get_order_detail(order_id: str):
    """获取订单详情 - 使用API组合"""
    composer = APIComposer()
    composer.register_service("order", order_service)
    composer.register_service("customer", customer_service)
    composer.register_service("product", product_service)
    composer.register_service("review", review_service)
    
    data = await composer.compose(order_detail_plan)
    
    # 组装最终响应
    return {
        "order": data["order"],
        "customer": data["customer"],
        "products": data["products"],
        "reviews": data["reviews"]
    }

错误处理与优化

API组合需要处理多个服务调用的错误。常用策略包括:部分失败处理(某些服务失败时返回部分数据)、降级策略(使用缓存数据或默认值)、超时控制(为每个服务调用设置合理的超时)。

性能优化方面,应该尽量并行调用无依赖关系的服务、使用批量API减少调用次数、利用缓存减少重复请求、在组合层进行数据过滤减少传输量。

class ResilientAPIComposer:
    """具有容错能力的API组合器"""
    def __init__(self):
        self.services = {}
        self.cache = {}
    
    async def compose_with_fallback(self, plan: list[dict]) -> dict:
        """带降级的API组合"""
        result = {}
        
        for step in plan:
            try:
                data = await self.call_service(step, result)
                result[step.get("map_to", step["service"])] = data
            except ServiceTimeoutError:
                # 超时降级 - 使用缓存
                cached = self.cache.get(f"{step['service']}:{step['method']}")
                if cached:
                    result[step.get("map_to", step["service"])] = cached
                else:
                    result[step.get("map_to", step["service"])] = None
            except ServiceError as e:
                # 服务错误 - 返回部分数据
                result[step.get("map_to", step["service"])] = {
                    "error": str(e),
                    "partial": True
                }
        
        return result