分布式事务2PC/TCC/Saga
分布式事务2PC/TCC/Saga
什么是分布式事务
分布式事务是指在分布式系统中,跨多个服务或数据库的事务操作。由于网络延迟、节点故障等原因,确保分布式事务的一致性是一个复杂的挑战。
两阶段提交(2PC)
两阶段提交是一种强一致性的分布式事务协议,通过协调者和参与者的协作来保证事务的原子性。
协议流程
协调者 参与者1 参与者2
| | |
|---- 1. Prepare ------->| |
|---- 1. Prepare ------------------------------>|
| | |
|<--- 2. Vote(Yes) -----| |
|<--- 2. Vote(Yes) ----------------------------|
| | |
|---- 3. Commit -------->| |
|---- 3. Commit ------------------------------>|
| | |
|<--- 4. ACK -----------| |
|<--- 4. ACK -------------------------------|
实现
// 协调者实现
@Component
public class TwoPhaseCommitCoordinator {
private final TransactionLogRepository transactionLogRepository;
private final ParticipantClient participantClient;
@Transactional
public void executeDistributedTransaction(DistributedTransaction transaction) {
// 1. 创建事务日志
TransactionLog log = new TransactionLog(
transaction.getId(),
TransactionStatus.PREPARE,
transaction.getParticipants()
);
transactionLogRepository.save(log);
// 2. 准备阶段
boolean allPrepared = prepareAllParticipants(transaction);
if (allPrepared) {
// 3. 提交阶段
commitAllParticipants(transaction);
log.setStatus(TransactionStatus.COMMITTED);
} else {
// 3. 回滚阶段
rollbackAllParticipants(transaction);
log.setStatus(TransactionStatus.ROLLED_BACK);
}
transactionLogRepository.save(log);
}
private boolean prepareAllParticipants(DistributedTransaction transaction) {
for (Participant participant : transaction.getParticipants()) {
try {
boolean prepared = participantClient.prepare(
participant.getServiceName(),
transaction.getId()
);
if (!prepared) {
return false;
}
} catch (Exception e) {
return false;
}
}
return true;
}
private void commitAllParticipants(DistributedTransaction transaction) {
for (Participant participant : transaction.getParticipants()) {
try {
participantClient.commit(
participant.getServiceName(),
transaction.getId()
);
} catch (Exception e) {
// 记录日志,稍后重试
logFailedCommit(transaction.getId(), participant.getServiceName());
}
}
}
private void rollbackAllParticipants(DistributedTransaction transaction) {
for (Participant participant : transaction.getParticipants()) {
try {
participantClient.rollback(
participant.getServiceName(),
transaction.getId()
);
} catch (Exception e) {
// 记录日志,稍后重试
logFailedRollback(transaction.getId(), participant.getServiceName());
}
}
}
}
// 参与者实现
@Component
public class TwoPhaseCommitParticipant {
private final TransactionRepository transactionRepository;
@Transactional
public boolean prepare(String transactionId) {
try {
// 冻结资源
freezeResources(transactionId);
// 记录事务状态
Transaction transaction = new Transaction(transactionId, TransactionStatus.PREPARED);
transactionRepository.save(transaction);
return true;
} catch (Exception e) {
return false;
}
}
@Transactional
public void commit(String transactionId) {
Transaction transaction = transactionRepository.findById(transactionId)
.orElseThrow(() -> new TransactionNotFoundException(transactionId));
// 提交事务
commitTransaction(transaction);
// 更新状态
transaction.setStatus(TransactionStatus.COMMITTED);
transactionRepository.save(transaction);
}
@Transactional
public void rollback(String transactionId) {
Transaction transaction = transactionRepository.findById(transactionId)
.orElseThrow(() -> new TransactionNotFoundException(transactionId));
// 回滚事务
rollbackTransaction(transaction);
// 更新状态
transaction.setStatus(TransactionStatus.ROLLED_BACK);
transactionRepository.save(transaction);
}
}
优缺点
优点:
- 强一致性
- 实现简单
缺点:
- 同步阻塞:所有参与者在准备阶段后需要等待协调者的指令
- 单点故障:协调者单点故障会导致事务失败
- 数据不一致:在提交阶段网络分区可能导致数据不一致
- 性能差:需要锁定资源,影响并发性能
TCC(Try-Confirm-Cancel)
TCC是一种补偿型分布式事务,将事务分为Try、Confirm和Cancel三个阶段。
协议流程
Try阶段:检查和预留资源
Confirm阶段:确认提交,释放预留资源
Cancel阶段:取消事务,释放预留资源
实现
// TCC接口定义
public interface TccService {
// Try阶段:检查和预留资源
boolean try(String transactionId, Object params);
// Confirm阶段:确认提交
boolean confirm(String transactionId);
// Cancel阶段:取消事务
boolean cancel(String transactionId);
}
// 订单TCC服务
@Component
public class OrderTccService implements TccService {
private final OrderRepository orderRepository;
private final InventoryClient inventoryClient;
private final PaymentClient paymentClient;
@Override
public boolean try(String transactionId, CreateOrderParams params) {
try {
// 1. 创建订单(待确认状态)
Order order = Order.createPending(transactionId, params);
orderRepository.save(order);
// 2. 冻结库存
boolean inventoryFrozen = inventoryClient.tryFreeze(
params.getProductId(),
params.getQuantity(),
transactionId
);
if (!inventoryFrozen) {
throw new InventoryFreezeException("Failed to freeze inventory");
}
// 3. 冻结余额
boolean balanceFrozen = paymentClient.tryFreeze(
params.getUserId(),
params.getAmount(),
transactionId
);
if (!balanceFrozen) {
throw new BalanceFreezeException("Failed to freeze balance");
}
return true;
} catch (Exception e) {
// Try失败,执行Cancel
cancel(transactionId);
return false;
}
}
@Override
public boolean confirm(String transactionId) {
try {
// 1. 确认订单
Order order = orderRepository.findByTransactionId(transactionId)
.orElseThrow(() -> new OrderNotFoundException(transactionId));
order.confirm();
orderRepository.save(order);
// 2. 确认库存扣减
inventoryClient.confirmFreeze(transactionId);
// 3. 确认余额扣减
paymentClient.confirmFreeze(transactionId);
return true;
} catch (Exception e) {
// Confirm失败,需要重试或人工干预
log.error("Failed to confirm transaction: {}", transactionId, e);
return false;
}
}
@Override
public boolean cancel(String transactionId) {
try {
// 1. 取消订单
Order order = orderRepository.findByTransactionId(transactionId)
.orElse(null);
if (order != null) {
order.cancel();
orderRepository.save(order);
}
// 2. 取消库存冻结
inventoryClient.cancelFreeze(transactionId);
// 3. 取消余额冻结
paymentClient.cancelFreeze(transactionId);
return true;
} catch (Exception e) {
// Cancel失败,需要重试
log.error("Failed to cancel transaction: {}", transactionId, e);
return false;
}
}
}
// TCC事务管理器
@Component
public class TccTransactionManager {
private final Map<String, TccService> tccServices = new ConcurrentHashMap<>();
private final TransactionLogRepository transactionLogRepository;
public void registerTccService(String name, TccService service) {
tccServices.put(name, service);
}
@Transactional
public void execute(List<TccInvocation> invocations) {
String transactionId = UUID.randomUUID().toString();
// 1. 创建事务日志
TransactionLog log = new TransactionLog(transactionId, TransactionStatus.STARTED);
transactionLogRepository.save(log);
// 2. Try阶段
boolean allTrySuccess = true;
for (TccInvocation invocation : invocations) {
TccService service = tccServices.get(invocation.getServiceName());
boolean trySuccess = service.try(transactionId, invocation.getParams());
if (!trySuccess) {
allTrySuccess = false;
break;
}
}
// 3. Confirm或Cancel阶段
if (allTrySuccess) {
for (TccInvocation invocation : invocations) {
TccService service = tccServices.get(invocation.getServiceName());
service.confirm(transactionId);
}
log.setStatus(TransactionStatus.COMMITTED);
} else {
for (TccInvocation invocation : invocations) {
TccService service = tccServices.get(invocation.getServiceName());
service.cancel(transactionId);
}
log.setStatus(TransactionStatus.ROLLED_BACK);
}
transactionLogRepository.save(log);
}
}
Saga模式
Saga模式将长事务拆分为一系列本地事务,每个本地事务都有对应的补偿操作。
协议流程
T1 -> T2 -> T3 -> T4
如果T4失败,则执行C3 -> C2 -> C1
实现
// Saga步骤定义
public class SagaStep<T> {
private final String name;
private final Function<T, Boolean> action;
private final Function<T, Boolean> compensation;
public SagaStep(String name,
Function<T, Boolean> action,
Function<T, Boolean> compensation) {
this.name = name;
this.action = action;
this.compensation = compensation;
}
// getters
}
// Saga执行器
@Component
public class SagaExecutor {
private final SagaLogRepository sagaLogRepository;
public <T> void execute(List<SagaStep<T>> steps, T context) {
String sagaId = UUID.randomUUID().toString();
List<SagaStep<T>> executedSteps = new ArrayList<>();
// 创建Saga日志
SagaLog log = new SagaLog(sagaId, SagaStatus.STARTED);
sagaLogRepository.save(log);
try {
// 执行所有步骤
for (SagaStep<T> step : steps) {
boolean success = step.getAction().apply(context);
if (!success) {
// 执行失败,执行补偿
compensate(executedSteps, context);
log.setStatus(SagaStatus.FAILED);
sagaLogRepository.save(log);
throw new SagaExecutionException("Saga step failed: " + step.getName());
}
executedSteps.add(step);
}
// 所有步骤成功
log.setStatus(SagaStatus.COMPLETED);
sagaLogRepository.save(log);
} catch (Exception e) {
// 执行补偿
compensate(executedSteps, context);
log.setStatus(SagaStatus.FAILED);
sagaLogRepository.save(log);
throw e;
}
}
private <T> void compensate(List<SagaStep<T>> executedSteps, T context) {
// 反向执行补偿操作
for (int i = executedSteps.size() - 1; i >= 0; i--) {
SagaStep<T> step = executedSteps.get(i);
try {
step.getCompensation().apply(context);
} catch (Exception e) {
// 记录补偿失败,需要人工干预
log.error("Compensation failed for step: {}", step.getName(), e);
}
}
}
}
// 订单Saga实现
@Component
public class OrderSaga {
private final SagaExecutor sagaExecutor;
private final OrderService orderService;
private final InventoryService inventoryService;
private final PaymentService paymentService;
private final ShippingService shippingService;
public void createOrder(CreateOrderParams params) {
List<SagaStep<CreateOrderParams>> steps = Arrays.asList(
// 步骤1:创建订单
new SagaStep<>(
"createOrder",
context -> {
Order order = orderService.createOrder(context);
context.setOrderId(order.getId());
return true;
},
context -> {
orderService.cancelOrder(context.getOrderId());
return true;
}
),
// 步骤2:扣减库存
new SagaStep<>(
"deductInventory",
context -> {
inventoryService.deduct(context.getProductId(), context.getQuantity());
return true;
},
context -> {
inventoryService.restore(context.getProductId(), context.getQuantity());
return true;
}
),
// 步骤3:处理支付
new SagaStep<>(
"processPayment",
context -> {
paymentService.processPayment(context.getUserId(), context.getAmount());
return true;
},
context -> {
paymentService.refund(context.getUserId(), context.getAmount());
return true;
}
),
// 步骤4:创建发货单
new SagaStep<>(
"createShipment",
context -> {
shippingService.createShipment(context.getOrderId());
return true;
},
context -> {
shippingService.cancelShipment(context.getOrderId());
return true;
}
)
);
sagaExecutor.execute(steps, params);
}
}
方案对比
| 特性 | 2PC | TCC | Saga |
|---|---|---|---|
| 一致性 | 强一致性 | 最终一致性 | 最终一致性 |
| 性能 | 低 | 中等 | 高 |
| 实现复杂度 | 低 | 高 | 中等 |
| 锁定资源 | 是 | 是(预留) | 否 |
| 适用场景 | 强一致性要求 | 高并发场景 | 长事务场景 |
| 补偿机制 | 无 | Cancel | 补偿操作 |
最佳实践
- 选择合适的方案:根据业务需求选择合适的分布式事务方案
- 设计补偿操作:确保每个操作都有对应的补偿操作
- 幂等性设计:确保操作可以安全地重试
- 监控和告警:监控事务执行状态,及时发现和处理问题
- 超时处理:设置合理的超时时间,避免长时间等待
- 日志记录:记录详细的事务日志,便于问题排查