← 返回首页
🌐

分布式事务2PC/TCC/Saga

📂 architecture ⏱ 5 min 983 words

分布式事务2PC/TCC/Saga

什么是分布式事务

分布式事务是指在分布式系统中,跨多个服务或数据库的事务操作。由于网络延迟、节点故障等原因,确保分布式事务的一致性是一个复杂的挑战。

两阶段提交(2PC)

两阶段提交是一种强一致性的分布式事务协议,通过协调者和参与者的协作来保证事务的原子性。

协议流程

协调者                    参与者1                 参与者2
   |                       |                       |
   |---- 1. Prepare ------->|                       |
   |---- 1. Prepare ------------------------------>|
   |                       |                       |
   |<--- 2. Vote(Yes) -----|                       |
   |<--- 2. Vote(Yes) ----------------------------|
   |                       |                       |
   |---- 3. Commit -------->|                       |
   |---- 3. Commit ------------------------------>|
   |                       |                       |
   |<--- 4. ACK -----------|                       |
   |<--- 4. ACK -------------------------------|

实现

// 协调者实现
@Component
public class TwoPhaseCommitCoordinator {
    
    private final TransactionLogRepository transactionLogRepository;
    private final ParticipantClient participantClient;
    
    @Transactional
    public void executeDistributedTransaction(DistributedTransaction transaction) {
        // 1. 创建事务日志
        TransactionLog log = new TransactionLog(
            transaction.getId(),
            TransactionStatus.PREPARE,
            transaction.getParticipants()
        );
        transactionLogRepository.save(log);
        
        // 2. 准备阶段
        boolean allPrepared = prepareAllParticipants(transaction);
        
        if (allPrepared) {
            // 3. 提交阶段
            commitAllParticipants(transaction);
            log.setStatus(TransactionStatus.COMMITTED);
        } else {
            // 3. 回滚阶段
            rollbackAllParticipants(transaction);
            log.setStatus(TransactionStatus.ROLLED_BACK);
        }
        
        transactionLogRepository.save(log);
    }
    
    private boolean prepareAllParticipants(DistributedTransaction transaction) {
        for (Participant participant : transaction.getParticipants()) {
            try {
                boolean prepared = participantClient.prepare(
                    participant.getServiceName(),
                    transaction.getId()
                );
                if (!prepared) {
                    return false;
                }
            } catch (Exception e) {
                return false;
            }
        }
        return true;
    }
    
    private void commitAllParticipants(DistributedTransaction transaction) {
        for (Participant participant : transaction.getParticipants()) {
            try {
                participantClient.commit(
                    participant.getServiceName(),
                    transaction.getId()
                );
            } catch (Exception e) {
                // 记录日志,稍后重试
                logFailedCommit(transaction.getId(), participant.getServiceName());
            }
        }
    }
    
    private void rollbackAllParticipants(DistributedTransaction transaction) {
        for (Participant participant : transaction.getParticipants()) {
            try {
                participantClient.rollback(
                    participant.getServiceName(),
                    transaction.getId()
                );
            } catch (Exception e) {
                // 记录日志,稍后重试
                logFailedRollback(transaction.getId(), participant.getServiceName());
            }
        }
    }
}

// 参与者实现
@Component
public class TwoPhaseCommitParticipant {
    
    private final TransactionRepository transactionRepository;
    
    @Transactional
    public boolean prepare(String transactionId) {
        try {
            // 冻结资源
            freezeResources(transactionId);
            
            // 记录事务状态
            Transaction transaction = new Transaction(transactionId, TransactionStatus.PREPARED);
            transactionRepository.save(transaction);
            
            return true;
        } catch (Exception e) {
            return false;
        }
    }
    
    @Transactional
    public void commit(String transactionId) {
        Transaction transaction = transactionRepository.findById(transactionId)
                .orElseThrow(() -> new TransactionNotFoundException(transactionId));
        
        // 提交事务
        commitTransaction(transaction);
        
        // 更新状态
        transaction.setStatus(TransactionStatus.COMMITTED);
        transactionRepository.save(transaction);
    }
    
    @Transactional
    public void rollback(String transactionId) {
        Transaction transaction = transactionRepository.findById(transactionId)
                .orElseThrow(() -> new TransactionNotFoundException(transactionId));
        
        // 回滚事务
        rollbackTransaction(transaction);
        
        // 更新状态
        transaction.setStatus(TransactionStatus.ROLLED_BACK);
        transactionRepository.save(transaction);
    }
}

优缺点

优点

缺点

TCC(Try-Confirm-Cancel)

TCC是一种补偿型分布式事务,将事务分为Try、Confirm和Cancel三个阶段。

协议流程

Try阶段:检查和预留资源
Confirm阶段:确认提交,释放预留资源
Cancel阶段:取消事务,释放预留资源

实现

// TCC接口定义
public interface TccService {
    
    // Try阶段:检查和预留资源
    boolean try(String transactionId, Object params);
    
    // Confirm阶段:确认提交
    boolean confirm(String transactionId);
    
    // Cancel阶段:取消事务
    boolean cancel(String transactionId);
}

// 订单TCC服务
@Component
public class OrderTccService implements TccService {
    
    private final OrderRepository orderRepository;
    private final InventoryClient inventoryClient;
    private final PaymentClient paymentClient;
    
    @Override
    public boolean try(String transactionId, CreateOrderParams params) {
        try {
            // 1. 创建订单(待确认状态)
            Order order = Order.createPending(transactionId, params);
            orderRepository.save(order);
            
            // 2. 冻结库存
            boolean inventoryFrozen = inventoryClient.tryFreeze(
                params.getProductId(),
                params.getQuantity(),
                transactionId
            );
            if (!inventoryFrozen) {
                throw new InventoryFreezeException("Failed to freeze inventory");
            }
            
            // 3. 冻结余额
            boolean balanceFrozen = paymentClient.tryFreeze(
                params.getUserId(),
                params.getAmount(),
                transactionId
            );
            if (!balanceFrozen) {
                throw new BalanceFreezeException("Failed to freeze balance");
            }
            
            return true;
        } catch (Exception e) {
            // Try失败,执行Cancel
            cancel(transactionId);
            return false;
        }
    }
    
    @Override
    public boolean confirm(String transactionId) {
        try {
            // 1. 确认订单
            Order order = orderRepository.findByTransactionId(transactionId)
                    .orElseThrow(() -> new OrderNotFoundException(transactionId));
            order.confirm();
            orderRepository.save(order);
            
            // 2. 确认库存扣减
            inventoryClient.confirmFreeze(transactionId);
            
            // 3. 确认余额扣减
            paymentClient.confirmFreeze(transactionId);
            
            return true;
        } catch (Exception e) {
            // Confirm失败,需要重试或人工干预
            log.error("Failed to confirm transaction: {}", transactionId, e);
            return false;
        }
    }
    
    @Override
    public boolean cancel(String transactionId) {
        try {
            // 1. 取消订单
            Order order = orderRepository.findByTransactionId(transactionId)
                    .orElse(null);
            if (order != null) {
                order.cancel();
                orderRepository.save(order);
            }
            
            // 2. 取消库存冻结
            inventoryClient.cancelFreeze(transactionId);
            
            // 3. 取消余额冻结
            paymentClient.cancelFreeze(transactionId);
            
            return true;
        } catch (Exception e) {
            // Cancel失败,需要重试
            log.error("Failed to cancel transaction: {}", transactionId, e);
            return false;
        }
    }
}

// TCC事务管理器
@Component
public class TccTransactionManager {
    
    private final Map<String, TccService> tccServices = new ConcurrentHashMap<>();
    private final TransactionLogRepository transactionLogRepository;
    
    public void registerTccService(String name, TccService service) {
        tccServices.put(name, service);
    }
    
    @Transactional
    public void execute(List<TccInvocation> invocations) {
        String transactionId = UUID.randomUUID().toString();
        
        // 1. 创建事务日志
        TransactionLog log = new TransactionLog(transactionId, TransactionStatus.STARTED);
        transactionLogRepository.save(log);
        
        // 2. Try阶段
        boolean allTrySuccess = true;
        for (TccInvocation invocation : invocations) {
            TccService service = tccServices.get(invocation.getServiceName());
            boolean trySuccess = service.try(transactionId, invocation.getParams());
            
            if (!trySuccess) {
                allTrySuccess = false;
                break;
            }
        }
        
        // 3. Confirm或Cancel阶段
        if (allTrySuccess) {
            for (TccInvocation invocation : invocations) {
                TccService service = tccServices.get(invocation.getServiceName());
                service.confirm(transactionId);
            }
            log.setStatus(TransactionStatus.COMMITTED);
        } else {
            for (TccInvocation invocation : invocations) {
                TccService service = tccServices.get(invocation.getServiceName());
                service.cancel(transactionId);
            }
            log.setStatus(TransactionStatus.ROLLED_BACK);
        }
        
        transactionLogRepository.save(log);
    }
}

Saga模式

Saga模式将长事务拆分为一系列本地事务,每个本地事务都有对应的补偿操作。

协议流程

T1 -> T2 -> T3 -> T4
如果T4失败,则执行C3 -> C2 -> C1

实现

// Saga步骤定义
public class SagaStep<T> {
    
    private final String name;
    private final Function<T, Boolean> action;
    private final Function<T, Boolean> compensation;
    
    public SagaStep(String name, 
                    Function<T, Boolean> action, 
                    Function<T, Boolean> compensation) {
        this.name = name;
        this.action = action;
        this.compensation = compensation;
    }
    
    // getters
}

// Saga执行器
@Component
public class SagaExecutor {
    
    private final SagaLogRepository sagaLogRepository;
    
    public <T> void execute(List<SagaStep<T>> steps, T context) {
        String sagaId = UUID.randomUUID().toString();
        List<SagaStep<T>> executedSteps = new ArrayList<>();
        
        // 创建Saga日志
        SagaLog log = new SagaLog(sagaId, SagaStatus.STARTED);
        sagaLogRepository.save(log);
        
        try {
            // 执行所有步骤
            for (SagaStep<T> step : steps) {
                boolean success = step.getAction().apply(context);
                
                if (!success) {
                    // 执行失败,执行补偿
                    compensate(executedSteps, context);
                    log.setStatus(SagaStatus.FAILED);
                    sagaLogRepository.save(log);
                    throw new SagaExecutionException("Saga step failed: " + step.getName());
                }
                
                executedSteps.add(step);
            }
            
            // 所有步骤成功
            log.setStatus(SagaStatus.COMPLETED);
            sagaLogRepository.save(log);
            
        } catch (Exception e) {
            // 执行补偿
            compensate(executedSteps, context);
            log.setStatus(SagaStatus.FAILED);
            sagaLogRepository.save(log);
            throw e;
        }
    }
    
    private <T> void compensate(List<SagaStep<T>> executedSteps, T context) {
        // 反向执行补偿操作
        for (int i = executedSteps.size() - 1; i >= 0; i--) {
            SagaStep<T> step = executedSteps.get(i);
            try {
                step.getCompensation().apply(context);
            } catch (Exception e) {
                // 记录补偿失败,需要人工干预
                log.error("Compensation failed for step: {}", step.getName(), e);
            }
        }
    }
}

// 订单Saga实现
@Component
public class OrderSaga {
    
    private final SagaExecutor sagaExecutor;
    private final OrderService orderService;
    private final InventoryService inventoryService;
    private final PaymentService paymentService;
    private final ShippingService shippingService;
    
    public void createOrder(CreateOrderParams params) {
        List<SagaStep<CreateOrderParams>> steps = Arrays.asList(
            // 步骤1:创建订单
            new SagaStep<>(
                "createOrder",
                context -> {
                    Order order = orderService.createOrder(context);
                    context.setOrderId(order.getId());
                    return true;
                },
                context -> {
                    orderService.cancelOrder(context.getOrderId());
                    return true;
                }
            ),
            
            // 步骤2:扣减库存
            new SagaStep<>(
                "deductInventory",
                context -> {
                    inventoryService.deduct(context.getProductId(), context.getQuantity());
                    return true;
                },
                context -> {
                    inventoryService.restore(context.getProductId(), context.getQuantity());
                    return true;
                }
            ),
            
            // 步骤3:处理支付
            new SagaStep<>(
                "processPayment",
                context -> {
                    paymentService.processPayment(context.getUserId(), context.getAmount());
                    return true;
                },
                context -> {
                    paymentService.refund(context.getUserId(), context.getAmount());
                    return true;
                }
            ),
            
            // 步骤4:创建发货单
            new SagaStep<>(
                "createShipment",
                context -> {
                    shippingService.createShipment(context.getOrderId());
                    return true;
                },
                context -> {
                    shippingService.cancelShipment(context.getOrderId());
                    return true;
                }
            )
        );
        
        sagaExecutor.execute(steps, params);
    }
}

方案对比

特性 2PC TCC Saga
一致性 强一致性 最终一致性 最终一致性
性能 中等
实现复杂度 中等
锁定资源 是(预留)
适用场景 强一致性要求 高并发场景 长事务场景
补偿机制 Cancel 补偿操作

最佳实践

  1. 选择合适的方案:根据业务需求选择合适的分布式事务方案
  2. 设计补偿操作:确保每个操作都有对应的补偿操作
  3. 幂等性设计:确保操作可以安全地重试
  4. 监控和告警:监控事务执行状态,及时发现和处理问题
  5. 超时处理:设置合理的超时时间,避免长时间等待
  6. 日志记录:记录详细的事务日志,便于问题排查