← 返回首页
🔗

Outbox模式

📂 architecture ⏱ 2 min 201 words

Outbox模式

Outbox模式概述

Outbox模式是解决微服务架构中"双写问题"的可靠事件发布机制。双写问题是指:在更新业务数据的同时发布事件到消息队列,如果其中一个操作失败,会导致数据不一致。Outbox模式通过将业务数据和事件写入同一个数据库事务,确保原子性,然后通过CDC(Change Data Capture)异步地将事件发布到消息队列。

Outbox模式的工作流程:首先,在业务事务中同时更新业务表和Outbox表;然后,CDC工具(如Debezium)监控Outbox表的变化;最后,CDC将捕获的事件发布到消息队列。这种模式确保了事件发布的可靠性和数据的一致性。

from sqlalchemy import Column, String, JSON, DateTime
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime

Base = declarative_base()

class OutboxEvent(Base):
    """Outbox事件表"""
    __tablename__ = "outbox_events"
    
    id = Column(String, primary_key=True)
    aggregate_type = Column(String, nullable=False)
    aggregate_id = Column(String, nullable=False)
    event_type = Column(String, nullable=False)
    payload = Column(JSON, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    published = Column(String, default="pending")  # pending, published, failed

class OrderService:
    def __init__(self, db_session):
        self.db = db_session
    
    def create_order(self, customer_id: str, items: list) -> str:
        """创建订单 - 使用Outbox模式发布事件"""
        # 开始事务
        order_id = generate_id()
        
        # 1. 写入业务数据
        order = Order(id=order_id, customer_id=customer_id, items=items)
        self.db.add(order)
        
        # 2. 写入Outbox事件(同一事务)
        event = OutboxEvent(
            id=generate_id(),
            aggregate_type="Order",
            aggregate_id=order_id,
            event_type="OrderCreated",
            payload={"order_id": order_id, "customer_id": customer_id}
        )
        self.db.add(event)
        
        # 3. 提交事务 - 保证原子性
        self.db.commit()
        
        return order_id

CDC事件捕获

CDC(Change Data Capture)是Outbox模式的核心组件。CDC通过监控数据库的事务日志(如MySQL的binlog、PostgreSQL的WAL)来捕获数据变更,将变更转换为事件并发布到消息队列。CDC的优势在于:对业务代码无侵入、低延迟、保证事件顺序。

Debezium是目前最流行的开源CDC工具,支持多种数据库和消息队列。Debezium作为Kafka Connect的连接器运行,可以实时捕获数据库变更并发布到Kafka。

# Debezium CDC配置示例
debezium:
  name: "order-service-connector"
  config:
    connector.class: "io.debezium.connector.postgresql.PostgresConnector"
    database.hostname: "postgres"
    database.port: "5432"
    database.user: "debezium"
    database.password: "${secrets:db-password}"
    database.dbname: "orders"
    database.server.name: "order-service"
    table.include.list: "public.outbox_events"
    transforms: "outbox"
    transforms.outbox.type: "io.debezium.transforms.outbox.EventRouter"
    transforms.outbox.table.field.event.key: "aggregate_id"
    transforms.outbox.table.field.event.type: "event_type"
    transforms.outbox.table.field.event.payload: "payload"
    transforms.outbox.route.topic.replacement: "events.${routedByValue}"

处理保证

Outbox模式提供以下处理保证:至少一次投递(事件可能被重复投递,消费者需要实现幂等性)、有序性(同一聚合的事件按顺序投递)、持久性(事件存储在数据库中,不会丢失)。

消费者端需要实现幂等性处理来应对重复投递。常用的方法包括:使用唯一消息ID进行去重、利用数据库唯一约束、使用Redis等缓存记录已处理的消息。

class IdempotentEventHandler:
    def __init__(self, db_session, redis_client):
        self.db = db_session
        self.redis = redis_client
    
    async def handle_event(self, event: dict) -> bool:
        """幂等性事件处理"""
        event_id = event["id"]
        
        # 检查是否已处理(Redis快速检查)
        if await self.redis.exists(f"processed:{event_id}"):
            return True
        
        # 处理事件
        await self.process(event)
        
        # 标记为已处理
        await self.redis.setex(f"processed:{event_id}", 86400, "1")
        
        # 持久化到数据库(防止Redis丢失)
        processed = ProcessedEvent(id=event_id, processed_at=datetime.utcnow())
        self.db.add(processed)
        self.db.commit()
        
        return True