Outbox模式
Outbox模式
Outbox模式概述
Outbox模式是解决微服务架构中"双写问题"的可靠事件发布机制。双写问题是指:在更新业务数据的同时发布事件到消息队列,如果其中一个操作失败,会导致数据不一致。Outbox模式通过将业务数据和事件写入同一个数据库事务,确保原子性,然后通过CDC(Change Data Capture)异步地将事件发布到消息队列。
Outbox模式的工作流程:首先,在业务事务中同时更新业务表和Outbox表;然后,CDC工具(如Debezium)监控Outbox表的变化;最后,CDC将捕获的事件发布到消息队列。这种模式确保了事件发布的可靠性和数据的一致性。
from sqlalchemy import Column, String, JSON, DateTime
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
Base = declarative_base()
class OutboxEvent(Base):
"""Outbox事件表"""
__tablename__ = "outbox_events"
id = Column(String, primary_key=True)
aggregate_type = Column(String, nullable=False)
aggregate_id = Column(String, nullable=False)
event_type = Column(String, nullable=False)
payload = Column(JSON, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
published = Column(String, default="pending") # pending, published, failed
class OrderService:
def __init__(self, db_session):
self.db = db_session
def create_order(self, customer_id: str, items: list) -> str:
"""创建订单 - 使用Outbox模式发布事件"""
# 开始事务
order_id = generate_id()
# 1. 写入业务数据
order = Order(id=order_id, customer_id=customer_id, items=items)
self.db.add(order)
# 2. 写入Outbox事件(同一事务)
event = OutboxEvent(
id=generate_id(),
aggregate_type="Order",
aggregate_id=order_id,
event_type="OrderCreated",
payload={"order_id": order_id, "customer_id": customer_id}
)
self.db.add(event)
# 3. 提交事务 - 保证原子性
self.db.commit()
return order_id
CDC事件捕获
CDC(Change Data Capture)是Outbox模式的核心组件。CDC通过监控数据库的事务日志(如MySQL的binlog、PostgreSQL的WAL)来捕获数据变更,将变更转换为事件并发布到消息队列。CDC的优势在于:对业务代码无侵入、低延迟、保证事件顺序。
Debezium是目前最流行的开源CDC工具,支持多种数据库和消息队列。Debezium作为Kafka Connect的连接器运行,可以实时捕获数据库变更并发布到Kafka。
# Debezium CDC配置示例
debezium:
name: "order-service-connector"
config:
connector.class: "io.debezium.connector.postgresql.PostgresConnector"
database.hostname: "postgres"
database.port: "5432"
database.user: "debezium"
database.password: "${secrets:db-password}"
database.dbname: "orders"
database.server.name: "order-service"
table.include.list: "public.outbox_events"
transforms: "outbox"
transforms.outbox.type: "io.debezium.transforms.outbox.EventRouter"
transforms.outbox.table.field.event.key: "aggregate_id"
transforms.outbox.table.field.event.type: "event_type"
transforms.outbox.table.field.event.payload: "payload"
transforms.outbox.route.topic.replacement: "events.${routedByValue}"
处理保证
Outbox模式提供以下处理保证:至少一次投递(事件可能被重复投递,消费者需要实现幂等性)、有序性(同一聚合的事件按顺序投递)、持久性(事件存储在数据库中,不会丢失)。
消费者端需要实现幂等性处理来应对重复投递。常用的方法包括:使用唯一消息ID进行去重、利用数据库唯一约束、使用Redis等缓存记录已处理的消息。
class IdempotentEventHandler:
def __init__(self, db_session, redis_client):
self.db = db_session
self.redis = redis_client
async def handle_event(self, event: dict) -> bool:
"""幂等性事件处理"""
event_id = event["id"]
# 检查是否已处理(Redis快速检查)
if await self.redis.exists(f"processed:{event_id}"):
return True
# 处理事件
await self.process(event)
# 标记为已处理
await self.redis.setex(f"processed:{event_id}", 86400, "1")
# 持久化到数据库(防止Redis丢失)
processed = ProcessedEvent(id=event_id, processed_at=datetime.utcnow())
self.db.add(processed)
self.db.commit()
return True