← 返回首页
🌐

事件溯源与CQRS实践

📂 architecture ⏱ 8 min 1415 words

事件溯源与CQRS实践

事件溯源概述

事件溯源(Event Sourcing)是一种设计模式,它将对象的状态存储为一系列事件的序列,而不是当前状态。通过重放事件可以重建任何时间点的状态。

核心概念

事件(Event)

事件是已经发生的事实的不可变记录。

// 事件基类
public abstract class DomainEvent {
    private final String eventId;
    private final String aggregateId;
    private final long version;
    private final LocalDateTime occurredAt;
    private final String eventType;
    
    protected DomainEvent(String aggregateId, long version) {
        this.eventId = UUID.randomUUID().toString();
        this.aggregateId = aggregateId;
        this.version = version;
        this.occurredAt = LocalDateTime.now();
        this.eventType = this.getClass().getSimpleName();
    }
    
    // getters
}

// 具体事件
public class OrderCreatedEvent extends DomainEvent {
    private final String customerId;
    private final List<OrderItemEvent> items;
    private final BigDecimal totalAmount;
    
    public OrderCreatedEvent(String aggregateId, long version, 
                            String customerId, List<OrderItemEvent> items, 
                            BigDecimal totalAmount) {
        super(aggregateId, version);
        this.customerId = customerId;
        this.items = items;
        this.totalAmount = totalAmount;
    }
    
    // getters
}

public class OrderItemAddedEvent extends DomainEvent {
    private final String productId;
    private final int quantity;
    private final BigDecimal unitPrice;
    
    public OrderItemAddedEvent(String aggregateId, long version,
                              String productId, int quantity, BigDecimal unitPrice) {
        super(aggregateId, version);
        this.productId = productId;
        this.quantity = quantity;
        this.unitPrice = unitPrice;
    }
    
    // getters
}

public class OrderConfirmedEvent extends DomainEvent {
    private final String confirmedBy;
    
    public OrderConfirmedEvent(String aggregateId, long version, String confirmedBy) {
        super(aggregateId, version);
        this.confirmedBy = confirmedBy;
    }
    
    // getters
}

聚合根(Aggregate Root)

聚合根是事件溯源的核心,它负责管理状态和产生事件。

// 订单聚合根
public class OrderAggregate {
    
    private String id;
    private String customerId;
    private List<OrderItem> items;
    private OrderStatus status;
    private BigDecimal totalAmount;
    private int version;
    
    // 用于重建状态的构造函数
    private OrderAggregate(String id) {
        this.id = id;
        this.items = new ArrayList<>();
        this.status = OrderStatus.CREATED;
        this.totalAmount = BigDecimal.ZERO;
    }
    
    // 创建新订单
    public static OrderAggregate create(String customerId, List<OrderItemCommand> itemCommands) {
        OrderAggregate order = new OrderAggregate(UUID.randomUUID().toString());
        
        // 产生事件
        OrderCreatedEvent event = new OrderCreatedEvent(
            order.id,
            1,
            customerId,
            itemCommands.stream()
                .map(cmd -> new OrderItemEvent(cmd.getProductId(), cmd.getQuantity(), cmd.getUnitPrice()))
                .collect(Collectors.toList()),
            calculateTotal(itemCommands)
        );
        
        // 应用事件
        order.apply(event);
        
        return order;
    }
    
    // 添加商品
    public OrderItemAddedEvent addItem(String productId, int quantity, BigDecimal unitPrice) {
        if (status != OrderStatus.CREATED) {
            throw new BusinessException("Can only add items to created orders");
        }
        
        OrderItemAddedEvent event = new OrderItemAddedEvent(
            id,
            version + 1,
            productId,
            quantity,
            unitPrice
        );
        
        apply(event);
        
        return event;
    }
    
    // 确认订单
    public OrderConfirmedEvent confirm(String confirmedBy) {
        if (status != OrderStatus.CREATED) {
            throw new BusinessException("Only created orders can be confirmed");
        }
        
        OrderConfirmedEvent event = new OrderConfirmedEvent(
            id,
            version + 1,
            confirmedBy
        );
        
        apply(event);
        
        return event;
    }
    
    // 应用事件
    private void apply(DomainEvent event) {
        if (event instanceof OrderCreatedEvent) {
            applyOrderCreated((OrderCreatedEvent) event);
        } else if (event instanceof OrderItemAddedEvent) {
            applyOrderItemAdded((OrderItemAddedEvent) event);
        } else if (event instanceof OrderConfirmedEvent) {
            applyOrderConfirmed((OrderConfirmedEvent) event);
        }
        
        this.version = event.getVersion();
    }
    
    private void applyOrderCreated(OrderCreatedEvent event) {
        this.customerId = event.getCustomerId();
        this.items = event.getItems().stream()
            .map(item -> new OrderItem(item.getProductId(), item.getQuantity(), item.getUnitPrice()))
            .collect(Collectors.toList());
        this.totalAmount = event.getTotalAmount();
    }
    
    private void applyOrderItemAdded(OrderItemAddedEvent event) {
        OrderItem item = new OrderItem(event.getProductId(), event.getQuantity(), event.getUnitPrice());
        this.items.add(item);
        this.totalAmount = this.totalAmount.add(event.getQuantity() * event.getUnitPrice());
    }
    
    private void applyOrderConfirmed(OrderConfirmedEvent event) {
        this.status = OrderStatus.CONFIRMED;
    }
    
    // 从历史事件重建状态
    public static OrderAggregate fromHistory(String id, List<DomainEvent> events) {
        OrderAggregate order = new OrderAggregate(id);
        
        for (DomainEvent event : events) {
            order.apply(event);
        }
        
        return order;
    }
}

事件存储(Event Store)

事件存储负责持久化和检索事件。

PostgreSQL实现

// 事件存储接口
public interface EventStore {
    void saveEvents(String aggregateId, List<DomainEvent> events, int expectedVersion);
    List<DomainEvent> getEvents(String aggregateId);
    List<DomainEvent> getEvents(String aggregateId, long fromVersion);
}

// PostgreSQL事件存储实现
@Repository
public class PostgresEventStore implements EventStore {
    
    private final JdbcTemplate jdbcTemplate;
    private final ObjectMapper objectMapper;
    
    @Override
    @Transactional
    public void saveEvents(String aggregateId, List<DomainEvent> events, int expectedVersion) {
        // 检查版本号
        int currentVersion = getCurrentVersion(aggregateId);
        if (currentVersion != expectedVersion) {
            throw new OptimisticConcurrencyException(
                "Expected version " + expectedVersion + " but was " + currentVersion
            );
        }
        
        // 保存事件
        for (DomainEvent event : events) {
            String sql = "INSERT INTO events (event_id, aggregate_id, event_type, event_data, version, occurred_at) " +
                         "VALUES (?, ?, ?, ?, ?, ?)";
            
            jdbcTemplate.update(sql,
                event.getEventId(),
                event.getAggregateId(),
                event.getEventType(),
                objectMapper.writeValueAsString(event),
                event.getVersion(),
                event.getOccurredAt()
            );
        }
    }
    
    @Override
    public List<DomainEvent> getEvents(String aggregateId) {
        String sql = "SELECT * FROM events WHERE aggregate_id = ? ORDER BY version ASC";
        
        return jdbcTemplate.query(sql, new Object[]{aggregateId}, (rs, rowNum) -> {
            String eventType = rs.getString("event_type");
            String eventData = rs.getString("event_data");
            
            Class<?> eventClass = Class.forName("com.example.events." + eventType);
            return (DomainEvent) objectMapper.readValue(eventData, eventClass);
        });
    }
    
    @Override
    public List<DomainEvent> getEvents(String aggregateId, long fromVersion) {
        String sql = "SELECT * FROM events WHERE aggregate_id = ? AND version > ? ORDER BY version ASC";
        
        return jdbcTemplate.query(sql, new Object[]{aggregateId, fromVersion}, (rs, rowNum) -> {
            String eventType = rs.getString("event_type");
            String eventData = rs.getString("event_data");
            
            Class<?> eventClass = Class.forName("com.example.events." + eventType);
            return (DomainEvent) objectMapper.readValue(eventData, eventClass);
        });
    }
    
    private int getCurrentVersion(String aggregateId) {
        String sql = "SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = ?";
        return jdbcTemplate.queryForObject(sql, new Object[]{aggregateId}, Integer.class);
    }
}

// 数据库表结构
/*
CREATE TABLE events (
    event_id VARCHAR(36) PRIMARY KEY,
    aggregate_id VARCHAR(36) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    event_data JSONB NOT NULL,
    version INT NOT NULL,
    occurred_at TIMESTAMP NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_events_aggregate_id ON events(aggregate_id);
CREATE INDEX idx_events_aggregate_version ON events(aggregate_id, version);
*/

MongoDB实现

// MongoDB事件存储实现
@Repository
public class MongoEventStore implements EventStore {
    
    private final MongoTemplate mongoTemplate;
    private final ObjectMapper objectMapper;
    
    @Override
    public void saveEvents(String aggregateId, List<DomainEvent> events, int expectedVersion) {
        // 检查版本号
        long currentVersion = getCurrentVersion(aggregateId);
        if (currentVersion != expectedVersion) {
            throw new OptimisticConcurrencyException(
                "Expected version " + expectedVersion + " but was " + currentVersion
            );
        }
        
        // 保存事件
        for (DomainEvent event : events) {
            Document doc = new Document()
                .append("eventId", event.getEventId())
                .append("aggregateId", event.getAggregateId())
                .append("eventType", event.getEventType())
                .append("eventData", objectMapper.writeValueAsString(event))
                .append("version", event.getVersion())
                .append("occurredAt", event.getOccurredAt());
            
            mongoTemplate.save(doc, "events");
        }
    }
    
    @Override
    public List<DomainEvent> getEvents(String aggregateId) {
        Query query = new Query(Criteria.where("aggregateId").is(aggregateId))
                .with(Sort.by(Sort.Direction.ASC, "version"));
        
        List<Document> docs = mongoTemplate.find(query, Document.class, "events");
        
        return docs.stream()
                .map(doc -> {
                    String eventType = doc.getString("eventType");
                    String eventData = doc.getString("eventData");
                    
                    try {
                        Class<?> eventClass = Class.forName("com.example.events." + eventType);
                        return (DomainEvent) objectMapper.readValue(eventData, eventClass);
                    } catch (Exception e) {
                        throw new EventDeserializationException("Failed to deserialize event", e);
                    }
                })
                .collect(Collectors.toList());
    }
    
    private long getCurrentVersion(String aggregateId) {
        Query query = new Query(Criteria.where("aggregateId").is(aggregateId))
                .with(Sort.by(Sort.Direction.DESC, "version"))
                .limit(1);
        
        Document doc = mongoTemplate.findOne(query, Document.class, "events");
        return doc != null ? doc.getLong("version") : 0;
    }
}

快照(Snapshot)

为了提高性能,可以定期创建聚合状态的快照。

// 快照
public class AggregateSnapshot {
    private String aggregateId;
    private String aggregateType;
    private int version;
    private String stateData;
    private LocalDateTime createdAt;
    
    // getters and setters
}

// 快照存储
@Repository
public class SnapshotStore {
    
    private final JdbcTemplate jdbcTemplate;
    private final ObjectMapper objectMapper;
    
    public void saveSnapshot(AggregateSnapshot snapshot) {
        String sql = "INSERT INTO snapshots (aggregate_id, aggregate_type, version, state_data, created_at) " +
                     "VALUES (?, ?, ?, ?, ?) " +
                     "ON CONFLICT (aggregate_id) DO UPDATE SET " +
                     "version = EXCLUDED.version, " +
                     "state_data = EXCLUDED.state_data, " +
                     "created_at = EXCLUDED.created_at";
        
        jdbcTemplate.update(sql,
            snapshot.getAggregateId(),
            snapshot.getAggregateType(),
            snapshot.getVersion(),
            snapshot.getStateData(),
            snapshot.getCreatedAt()
        );
    }
    
    public Optional<AggregateSnapshot> getSnapshot(String aggregateId) {
        String sql = "SELECT * FROM snapshots WHERE aggregate_id = ?";
        
        return jdbcTemplate.queryForObject(sql, new Object[]{aggregateId}, (rs, rowNum) -> {
            AggregateSnapshot snapshot = new AggregateSnapshot();
            snapshot.setAggregateId(rs.getString("aggregate_id"));
            snapshot.setAggregateType(rs.getString("aggregate_type"));
            snapshot.setVersion(rs.getInt("version"));
            snapshot.setStateData(rs.getString("state_data"));
            snapshot.setCreatedAt(rs.getTimestamp("created_at").toLocalDateTime());
            return snapshot;
        });
    }
}

// 带快照的事件存储
@Component
public class EventStoreWithSnapshot {
    
    private final EventStore eventStore;
    private final SnapshotStore snapshotStore;
    private final ObjectMapper objectMapper;
    
    private static final int SNAPSHOT_INTERVAL = 100;
    
    public OrderAggregate loadOrder(String orderId) {
        // 尝试加载快照
        Optional<AggregateSnapshot> snapshot = snapshotStore.getSnapshot(orderId);
        
        if (snapshot.isPresent()) {
            // 从快照重建状态
            OrderAggregate order = restoreFromSnapshot(snapshot.get());
            
            // 加载快照之后的事件
            List<DomainEvent> events = eventStore.getEvents(orderId, snapshot.get().getVersion());
            
            // 应用后续事件
            for (DomainEvent event : events) {
                order.apply(event);
            }
            
            return order;
        } else {
            // 从事件历史重建状态
            List<DomainEvent> events = eventStore.getEvents(orderId);
            return OrderAggregate.fromHistory(orderId, events);
        }
    }
    
    public void saveOrder(OrderAggregate order, List<DomainEvent> events) {
        // 保存事件
        eventStore.saveEvents(order.getId(), events, order.getVersion() - events.size());
        
        // 定期创建快照
        if (order.getVersion() % SNAPSHOT_INTERVAL == 0) {
            createSnapshot(order);
        }
    }
    
    private void createSnapshot(OrderAggregate order) {
        AggregateSnapshot snapshot = new AggregateSnapshot();
        snapshot.setAggregateId(order.getId());
        snapshot.setAggregateType("Order");
        snapshot.setVersion(order.getVersion());
        snapshot.setStateData(objectMapper.writeValueAsString(order));
        snapshot.setCreatedAt(LocalDateTime.now());
        
        snapshotStore.saveSnapshot(snapshot);
    }
    
    private OrderAggregate restoreFromSnapshot(AggregateSnapshot snapshot) {
        try {
            return objectMapper.readValue(snapshot.getStateData(), OrderAggregate.class);
        } catch (Exception e) {
            throw new SnapshotRestoreException("Failed to restore from snapshot", e);
        }
    }
}

事件溯源与CQRS结合

// 命令处理器
@Component
public class OrderCommandHandler {
    
    private final EventStoreWithSnapshot eventStore;
    private final EventBus eventBus;
    
    @Transactional
    public void handle(CreateOrderCommand command) {
        // 创建聚合
        OrderAggregate order = OrderAggregate.create(
            command.getCustomerId(),
            command.getItems()
        );
        
        // 获取产生的事件
        List<DomainEvent> events = order.getUncommittedEvents();
        
        // 保存事件
        eventStore.saveOrder(order, events);
        
        // 发布事件
        for (DomainEvent event : events) {
            eventBus.publish(event);
        }
    }
    
    @Transactional
    public void handle(AddOrderItemCommand command) {
        // 加载聚合
        OrderAggregate order = eventStore.loadOrder(command.getOrderId());
        
        // 执行业务逻辑
        OrderItemAddedEvent event = order.addItem(
            command.getProductId(),
            command.getQuantity(),
            command.getUnitPrice()
        );
        
        // 保存事件
        eventStore.saveOrder(order, Collections.singletonList(event));
        
        // 发布事件
        eventBus.publish(event);
    }
}

// 查询处理器
@Component
public class OrderQueryHandler {
    
    private final OrderReadRepository readRepository;
    
    public OrderDto getOrder(String orderId) {
        return readRepository.findById(orderId)
                .orElseThrow(() -> new OrderNotFoundException(orderId));
    }
    
    public List<OrderSummaryDto> getOrdersByCustomer(String customerId) {
        return readRepository.findByCustomerId(customerId);
    }
}

// 读模型更新器
@Component
public class OrderReadModelUpdater {
    
    private final OrderReadRepository readRepository;
    
    @EventHandler
    public void on(OrderCreatedEvent event) {
        OrderReadModel model = new OrderReadModel();
        model.setId(event.getAggregateId());
        model.setCustomerId(event.getCustomerId());
        model.setStatus("CREATED");
        model.setTotalAmount(event.getTotalAmount());
        model.setItemCount(event.getItems().size());
        model.setCreatedAt(event.getOccurredAt());
        
        readRepository.save(model);
    }
    
    @EventHandler
    public void on(OrderItemAddedEvent event) {
        OrderReadModel model = readRepository.findById(event.getAggregateId())
                .orElseThrow(() -> new OrderNotFoundException(event.getAggregateId()));
        
        model.setItemCount(model.getItemCount() + 1);
        model.setTotalAmount(model.getTotalAmount().add(
            BigDecimal.valueOf(event.getQuantity() * event.getUnitPrice())
        ));
        
        readRepository.save(model);
    }
    
    @EventHandler
    public void on(OrderConfirmedEvent event) {
        OrderReadModel model = readRepository.findById(event.getAggregateId())
                .orElseThrow(() -> new OrderNotFoundException(event.getAggregateId()));
        
        model.setStatus("CONFIRMED");
        model.setConfirmedAt(event.getOccurredAt());
        
        readRepository.save(model);
    }
}

事件溯源的优点

  1. 完整审计日志:记录所有状态变化,便于审计和合规
  2. 时间旅行:可以重建任何历史时间点的状态
  3. 调试能力:可以重放事件来重现问题
  4. 解耦读写:与CQRS结合实现读写分离
  5. 事件驱动:天然支持事件驱动架构

实践建议

  1. 事件设计:事件应该包含足够的信息,但不暴露实现细节
  2. 版本管理:使用事件版本管理来处理事件结构变化
  3. 快照策略:根据聚合大小和访问频率制定快照策略
  4. 事件存储选择:根据需求选择合适的事件存储(PostgreSQL、MongoDB、EventStoreDB)
  5. 监控告警:监控事件存储的性能和状态