← 返回首页

分布式事务详解

📂 java ⏱ 2 min 231 words

分布式事务详解

分布式事务是指在分布式系统中,跨多个服务或数据库的数据一致性问题。理解其理论基础和解决方案是微服务架构的关键。

理论基础

CAP定理

分布式系统最多同时满足以下三个特性中的两个:

/**
 * CAP三选二的实例
 * - CP系统:ZooKeeper(一致性+分区容忍,牺牲可用性)
 * - AP系统:Eureka(可用性+分区容忍,牺牲一致性)
 */
public class CapDemo {
    // ZooKeeper保证强一致性,但网络分区时不可用
    // Eureka保证可用性,但数据可能不一致
}

BASE理论

/**
 * BASE = Basically Available + Soft state + Eventually consistent
 * 核心思想:通过最终一致性换取高可用
 */
public class BaseDemo {
    // 基本可用:系统允许降级
    // 软状态:允许中间状态存在
    // 最终一致:最终数据达到一致状态
}

分布式事务解决方案

2PC(两阶段提交)

@Transactional
public void createOrder(OrderDTO dto) {
    // 阶段1:准备
    inventoryService.prepareDeduct(dto.getItems());   // 预扣库存
    accountService.prepareFreeze(dto.getUserId(), dto.getAmount()); // 预冻结金额

    // 阶段2:提交
    try {
        inventoryService.commitDeduct(dto.getItems());
        accountService.commitFreeze(dto.getUserId(), dto.getAmount());
    } catch (Exception e) {
        inventoryService.rollbackDeduct(dto.getItems());
        accountService.rollbackFreeze(dto.getUserId(), dto.getAmount());
    }
}

TCC(Try-Confirm-Cancel)

// Try:预留资源
@Compensable(confirmMethod = "confirmDeduct", cancelMethod = "cancelDeduct")
public void tryDeduct(String skuId, int quantity) {
    inventoryMapper.freezeStock(skuId, quantity);
}

// Confirm:确认扣减
public void confirmDeduct(String skuId, int quantity) {
    inventoryMapper.deductStock(skuId, quantity);
    inventoryMapper.unfreezeStock(skuId, quantity);
}

// Cancel:释放资源
public void cancelDeduct(String skuId, int quantity) {
    inventoryMapper.unfreezeStock(skuId, quantity);
}

本地消息表

@Service
public class OrderService {
    @Transactional
    public void createOrder(OrderDTO dto) {
        orderMapper.insert(dto);
        // 本地消息表记录待发送消息
        outboxMapper.insert(new OutboxMessage("order.created",
            objectMapper.writeValueAsString(dto), "PENDING"));
    }

    @Scheduled(fixedRate = 5000)
    public void retrySendMessage() {
        List<OutboxMessage> messages = outboxMapper.findByStatus("PENDING");
        for (OutboxMessage msg : messages) {
            try {
                rabbitTemplate.convertAndSend(msg.getTopic(), msg.getPayload());
                outboxMapper.updateStatus(msg.getId(), "SENT");
            } catch (Exception e) {
                outboxMapper.incrementRetryCount(msg.getId());
            }
        }
    }
}

可靠消息最终一致性

// 消息状态机:PENDING -> SENT -> CONFIRMED -> CONSUMED
@Component
public class TransactionalMessageService {
    @Transactional
    public void sendInTransaction(String topic, Object payload) {
        TransactionMessage msg = TransactionMessage.builder()
            .id(UUID.randomUUID().toString())
            .topic(topic)
            .payload(objectMapper.writeValueAsString(payload))
            .status(MessageStatus.PENDING)
            .build();
        messageMapper.insert(msg);

        // 事务提交后通过事务同步器发送
        TransactionSynchronizationManager.registerSynchronization(
            new TransactionSynchronization() {
                @Override
                public void afterCommit() {
                    kafkaTemplate.send(topic, msg.getId(), msg.getPayload());
                }
            });
    }
}

小结

没有银弹的分布式事务方案,需要根据业务场景选择合适的一致性保障策略。