Saga模式
Saga模式
Saga模式概述
Saga是一种管理分布式事务的模式,最早由Hector Garcia-Molina和Kenneth Salem在1987年提出。在微服务架构下,传统的分布式事务(如两阶段提交)由于性能和可用性问题不再适用,Saga模式通过将长事务拆分为一系列本地事务来解决跨服务的数据一致性问题。
Saga的核心思想是:每个本地事务都有对应的补偿操作。当某个事务失败时,系统执行之前所有已完成事务的补偿操作,实现最终一致性。Saga不保证ACID,而是提供最终一致性和隔离性。
from enum import Enum
from typing import Callable
class SagaStep:
def __init__(self, name: str, action: Callable,
compensation: Callable):
self.name = name
self.action = action
self.compensation = compensation
self.executed = False
class Saga:
def __init__(self, name: str):
self.name = name
self.steps: list[SagaStep] = []
self.context = {}
def add_step(self, name: str, action: Callable,
compensation: Callable):
self.steps.append(SagaStep(name, action, compensation))
def execute(self) -> bool:
"""执行Saga,失败时回滚"""
executed_steps = []
for step in self.steps:
try:
result = step.action(self.context)
self.context[step.name] = result
step.executed = True
executed_steps.append(step)
except Exception as e:
print(f"Saga步骤 {step.name} 失败: {e}")
self.compensate(executed_steps)
return False
return True
def compensate(self, executed_steps: list[SagaStep]):
"""补偿已执行的步骤"""
for step in reversed(executed_steps):
try:
step.compensation(self.context)
print(f"补偿步骤 {step.name} 成功")
except Exception as e:
print(f"补偿步骤 {step.name} 失败: {e}")
# 记录日志,需要人工干预
编排式Saga
编排式Saga(Orchestration)通过一个中央协调器(Orchestrator)来管理整个Saga的执行流程。协调器负责:调用各个参与者执行本地事务、处理参与者的响应、在失败时调用补偿操作、维护Saga的全局状态。
编排式Saga的优势在于:流程清晰(所有逻辑集中在协调器)、易于理解和调试、便于监控和管理。缺点是:协调器可能成为单点、增加了系统的复杂度、协调器需要了解所有参与者的细节。
class OrderSagaOrchestrator:
def __init__(self):
self.saga = Saga("创建订单")
self.setup_steps()
def setup_steps(self):
self.saga.add_step(
name="create_order",
action=self.create_order,
compensation=self.cancel_order
)
self.saga.add_step(
name="reserve_inventory",
action=self.reserve_inventory,
compensation=self.release_inventory
)
self.saga.add_step(
name="process_payment",
action=self.process_payment,
compensation=self.refund_payment
)
def create_order(self, context):
order = Order.create(context["items"])
return order.id
def cancel_order(self, context):
Order.cancel(context["create_order"])
def reserve_inventory(self, context):
return InventoryService.reserve(context["items"])
def release_inventory(self, context):
InventoryService.release(context["reserve_inventory"])
def process_payment(self, context):
return PaymentService.charge(context["order_id"], context["amount"])
def refund_payment(self, context):
PaymentService.refund(context["process_payment"])
def execute(self, items: list, amount: float) -> bool:
self.saga.context = {"items": items, "amount": amount}
return self.saga.execute()
协调式Saga
协调式Saga(Choreography)没有中央协调器,每个参与者在完成自己的本地事务后发布事件,其他参与者监听这些事件来触发自己的事务。协调式Saga通过事件驱动的方式实现流程的流转。
协调式Saga的优势在于:去中心化(没有单点故障)、松耦合(参与者之间没有直接依赖)、易于扩展(添加新参与者只需监听事件)。缺点是:流程分散(难以看到完整的业务流程)、调试困难、可能产生循环依赖。