事件溯源与CQRS实践
事件溯源与CQRS实践
事件溯源概述
事件溯源(Event Sourcing)是一种设计模式,它将对象的状态存储为一系列事件的序列,而不是当前状态。通过重放事件可以重建任何时间点的状态。
核心概念
事件(Event)
事件是已经发生的事实的不可变记录。
// 事件基类
public abstract class DomainEvent {
private final String eventId;
private final String aggregateId;
private final long version;
private final LocalDateTime occurredAt;
private final String eventType;
protected DomainEvent(String aggregateId, long version) {
this.eventId = UUID.randomUUID().toString();
this.aggregateId = aggregateId;
this.version = version;
this.occurredAt = LocalDateTime.now();
this.eventType = this.getClass().getSimpleName();
}
// getters
}
// 具体事件
public class OrderCreatedEvent extends DomainEvent {
private final String customerId;
private final List<OrderItemEvent> items;
private final BigDecimal totalAmount;
public OrderCreatedEvent(String aggregateId, long version,
String customerId, List<OrderItemEvent> items,
BigDecimal totalAmount) {
super(aggregateId, version);
this.customerId = customerId;
this.items = items;
this.totalAmount = totalAmount;
}
// getters
}
public class OrderItemAddedEvent extends DomainEvent {
private final String productId;
private final int quantity;
private final BigDecimal unitPrice;
public OrderItemAddedEvent(String aggregateId, long version,
String productId, int quantity, BigDecimal unitPrice) {
super(aggregateId, version);
this.productId = productId;
this.quantity = quantity;
this.unitPrice = unitPrice;
}
// getters
}
public class OrderConfirmedEvent extends DomainEvent {
private final String confirmedBy;
public OrderConfirmedEvent(String aggregateId, long version, String confirmedBy) {
super(aggregateId, version);
this.confirmedBy = confirmedBy;
}
// getters
}
聚合根(Aggregate Root)
聚合根是事件溯源的核心,它负责管理状态和产生事件。
// 订单聚合根
public class OrderAggregate {
private String id;
private String customerId;
private List<OrderItem> items;
private OrderStatus status;
private BigDecimal totalAmount;
private int version;
// 用于重建状态的构造函数
private OrderAggregate(String id) {
this.id = id;
this.items = new ArrayList<>();
this.status = OrderStatus.CREATED;
this.totalAmount = BigDecimal.ZERO;
}
// 创建新订单
public static OrderAggregate create(String customerId, List<OrderItemCommand> itemCommands) {
OrderAggregate order = new OrderAggregate(UUID.randomUUID().toString());
// 产生事件
OrderCreatedEvent event = new OrderCreatedEvent(
order.id,
1,
customerId,
itemCommands.stream()
.map(cmd -> new OrderItemEvent(cmd.getProductId(), cmd.getQuantity(), cmd.getUnitPrice()))
.collect(Collectors.toList()),
calculateTotal(itemCommands)
);
// 应用事件
order.apply(event);
return order;
}
// 添加商品
public OrderItemAddedEvent addItem(String productId, int quantity, BigDecimal unitPrice) {
if (status != OrderStatus.CREATED) {
throw new BusinessException("Can only add items to created orders");
}
OrderItemAddedEvent event = new OrderItemAddedEvent(
id,
version + 1,
productId,
quantity,
unitPrice
);
apply(event);
return event;
}
// 确认订单
public OrderConfirmedEvent confirm(String confirmedBy) {
if (status != OrderStatus.CREATED) {
throw new BusinessException("Only created orders can be confirmed");
}
OrderConfirmedEvent event = new OrderConfirmedEvent(
id,
version + 1,
confirmedBy
);
apply(event);
return event;
}
// 应用事件
private void apply(DomainEvent event) {
if (event instanceof OrderCreatedEvent) {
applyOrderCreated((OrderCreatedEvent) event);
} else if (event instanceof OrderItemAddedEvent) {
applyOrderItemAdded((OrderItemAddedEvent) event);
} else if (event instanceof OrderConfirmedEvent) {
applyOrderConfirmed((OrderConfirmedEvent) event);
}
this.version = event.getVersion();
}
private void applyOrderCreated(OrderCreatedEvent event) {
this.customerId = event.getCustomerId();
this.items = event.getItems().stream()
.map(item -> new OrderItem(item.getProductId(), item.getQuantity(), item.getUnitPrice()))
.collect(Collectors.toList());
this.totalAmount = event.getTotalAmount();
}
private void applyOrderItemAdded(OrderItemAddedEvent event) {
OrderItem item = new OrderItem(event.getProductId(), event.getQuantity(), event.getUnitPrice());
this.items.add(item);
this.totalAmount = this.totalAmount.add(event.getQuantity() * event.getUnitPrice());
}
private void applyOrderConfirmed(OrderConfirmedEvent event) {
this.status = OrderStatus.CONFIRMED;
}
// 从历史事件重建状态
public static OrderAggregate fromHistory(String id, List<DomainEvent> events) {
OrderAggregate order = new OrderAggregate(id);
for (DomainEvent event : events) {
order.apply(event);
}
return order;
}
}
事件存储(Event Store)
事件存储负责持久化和检索事件。
PostgreSQL实现
// 事件存储接口
public interface EventStore {
void saveEvents(String aggregateId, List<DomainEvent> events, int expectedVersion);
List<DomainEvent> getEvents(String aggregateId);
List<DomainEvent> getEvents(String aggregateId, long fromVersion);
}
// PostgreSQL事件存储实现
@Repository
public class PostgresEventStore implements EventStore {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
@Override
@Transactional
public void saveEvents(String aggregateId, List<DomainEvent> events, int expectedVersion) {
// 检查版本号
int currentVersion = getCurrentVersion(aggregateId);
if (currentVersion != expectedVersion) {
throw new OptimisticConcurrencyException(
"Expected version " + expectedVersion + " but was " + currentVersion
);
}
// 保存事件
for (DomainEvent event : events) {
String sql = "INSERT INTO events (event_id, aggregate_id, event_type, event_data, version, occurred_at) " +
"VALUES (?, ?, ?, ?, ?, ?)";
jdbcTemplate.update(sql,
event.getEventId(),
event.getAggregateId(),
event.getEventType(),
objectMapper.writeValueAsString(event),
event.getVersion(),
event.getOccurredAt()
);
}
}
@Override
public List<DomainEvent> getEvents(String aggregateId) {
String sql = "SELECT * FROM events WHERE aggregate_id = ? ORDER BY version ASC";
return jdbcTemplate.query(sql, new Object[]{aggregateId}, (rs, rowNum) -> {
String eventType = rs.getString("event_type");
String eventData = rs.getString("event_data");
Class<?> eventClass = Class.forName("com.example.events." + eventType);
return (DomainEvent) objectMapper.readValue(eventData, eventClass);
});
}
@Override
public List<DomainEvent> getEvents(String aggregateId, long fromVersion) {
String sql = "SELECT * FROM events WHERE aggregate_id = ? AND version > ? ORDER BY version ASC";
return jdbcTemplate.query(sql, new Object[]{aggregateId, fromVersion}, (rs, rowNum) -> {
String eventType = rs.getString("event_type");
String eventData = rs.getString("event_data");
Class<?> eventClass = Class.forName("com.example.events." + eventType);
return (DomainEvent) objectMapper.readValue(eventData, eventClass);
});
}
private int getCurrentVersion(String aggregateId) {
String sql = "SELECT COALESCE(MAX(version), 0) FROM events WHERE aggregate_id = ?";
return jdbcTemplate.queryForObject(sql, new Object[]{aggregateId}, Integer.class);
}
}
// 数据库表结构
/*
CREATE TABLE events (
event_id VARCHAR(36) PRIMARY KEY,
aggregate_id VARCHAR(36) NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_data JSONB NOT NULL,
version INT NOT NULL,
occurred_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_events_aggregate_id ON events(aggregate_id);
CREATE INDEX idx_events_aggregate_version ON events(aggregate_id, version);
*/
MongoDB实现
// MongoDB事件存储实现
@Repository
public class MongoEventStore implements EventStore {
private final MongoTemplate mongoTemplate;
private final ObjectMapper objectMapper;
@Override
public void saveEvents(String aggregateId, List<DomainEvent> events, int expectedVersion) {
// 检查版本号
long currentVersion = getCurrentVersion(aggregateId);
if (currentVersion != expectedVersion) {
throw new OptimisticConcurrencyException(
"Expected version " + expectedVersion + " but was " + currentVersion
);
}
// 保存事件
for (DomainEvent event : events) {
Document doc = new Document()
.append("eventId", event.getEventId())
.append("aggregateId", event.getAggregateId())
.append("eventType", event.getEventType())
.append("eventData", objectMapper.writeValueAsString(event))
.append("version", event.getVersion())
.append("occurredAt", event.getOccurredAt());
mongoTemplate.save(doc, "events");
}
}
@Override
public List<DomainEvent> getEvents(String aggregateId) {
Query query = new Query(Criteria.where("aggregateId").is(aggregateId))
.with(Sort.by(Sort.Direction.ASC, "version"));
List<Document> docs = mongoTemplate.find(query, Document.class, "events");
return docs.stream()
.map(doc -> {
String eventType = doc.getString("eventType");
String eventData = doc.getString("eventData");
try {
Class<?> eventClass = Class.forName("com.example.events." + eventType);
return (DomainEvent) objectMapper.readValue(eventData, eventClass);
} catch (Exception e) {
throw new EventDeserializationException("Failed to deserialize event", e);
}
})
.collect(Collectors.toList());
}
private long getCurrentVersion(String aggregateId) {
Query query = new Query(Criteria.where("aggregateId").is(aggregateId))
.with(Sort.by(Sort.Direction.DESC, "version"))
.limit(1);
Document doc = mongoTemplate.findOne(query, Document.class, "events");
return doc != null ? doc.getLong("version") : 0;
}
}
快照(Snapshot)
为了提高性能,可以定期创建聚合状态的快照。
// 快照
public class AggregateSnapshot {
private String aggregateId;
private String aggregateType;
private int version;
private String stateData;
private LocalDateTime createdAt;
// getters and setters
}
// 快照存储
@Repository
public class SnapshotStore {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
public void saveSnapshot(AggregateSnapshot snapshot) {
String sql = "INSERT INTO snapshots (aggregate_id, aggregate_type, version, state_data, created_at) " +
"VALUES (?, ?, ?, ?, ?) " +
"ON CONFLICT (aggregate_id) DO UPDATE SET " +
"version = EXCLUDED.version, " +
"state_data = EXCLUDED.state_data, " +
"created_at = EXCLUDED.created_at";
jdbcTemplate.update(sql,
snapshot.getAggregateId(),
snapshot.getAggregateType(),
snapshot.getVersion(),
snapshot.getStateData(),
snapshot.getCreatedAt()
);
}
public Optional<AggregateSnapshot> getSnapshot(String aggregateId) {
String sql = "SELECT * FROM snapshots WHERE aggregate_id = ?";
return jdbcTemplate.queryForObject(sql, new Object[]{aggregateId}, (rs, rowNum) -> {
AggregateSnapshot snapshot = new AggregateSnapshot();
snapshot.setAggregateId(rs.getString("aggregate_id"));
snapshot.setAggregateType(rs.getString("aggregate_type"));
snapshot.setVersion(rs.getInt("version"));
snapshot.setStateData(rs.getString("state_data"));
snapshot.setCreatedAt(rs.getTimestamp("created_at").toLocalDateTime());
return snapshot;
});
}
}
// 带快照的事件存储
@Component
public class EventStoreWithSnapshot {
private final EventStore eventStore;
private final SnapshotStore snapshotStore;
private final ObjectMapper objectMapper;
private static final int SNAPSHOT_INTERVAL = 100;
public OrderAggregate loadOrder(String orderId) {
// 尝试加载快照
Optional<AggregateSnapshot> snapshot = snapshotStore.getSnapshot(orderId);
if (snapshot.isPresent()) {
// 从快照重建状态
OrderAggregate order = restoreFromSnapshot(snapshot.get());
// 加载快照之后的事件
List<DomainEvent> events = eventStore.getEvents(orderId, snapshot.get().getVersion());
// 应用后续事件
for (DomainEvent event : events) {
order.apply(event);
}
return order;
} else {
// 从事件历史重建状态
List<DomainEvent> events = eventStore.getEvents(orderId);
return OrderAggregate.fromHistory(orderId, events);
}
}
public void saveOrder(OrderAggregate order, List<DomainEvent> events) {
// 保存事件
eventStore.saveEvents(order.getId(), events, order.getVersion() - events.size());
// 定期创建快照
if (order.getVersion() % SNAPSHOT_INTERVAL == 0) {
createSnapshot(order);
}
}
private void createSnapshot(OrderAggregate order) {
AggregateSnapshot snapshot = new AggregateSnapshot();
snapshot.setAggregateId(order.getId());
snapshot.setAggregateType("Order");
snapshot.setVersion(order.getVersion());
snapshot.setStateData(objectMapper.writeValueAsString(order));
snapshot.setCreatedAt(LocalDateTime.now());
snapshotStore.saveSnapshot(snapshot);
}
private OrderAggregate restoreFromSnapshot(AggregateSnapshot snapshot) {
try {
return objectMapper.readValue(snapshot.getStateData(), OrderAggregate.class);
} catch (Exception e) {
throw new SnapshotRestoreException("Failed to restore from snapshot", e);
}
}
}
事件溯源与CQRS结合
// 命令处理器
@Component
public class OrderCommandHandler {
private final EventStoreWithSnapshot eventStore;
private final EventBus eventBus;
@Transactional
public void handle(CreateOrderCommand command) {
// 创建聚合
OrderAggregate order = OrderAggregate.create(
command.getCustomerId(),
command.getItems()
);
// 获取产生的事件
List<DomainEvent> events = order.getUncommittedEvents();
// 保存事件
eventStore.saveOrder(order, events);
// 发布事件
for (DomainEvent event : events) {
eventBus.publish(event);
}
}
@Transactional
public void handle(AddOrderItemCommand command) {
// 加载聚合
OrderAggregate order = eventStore.loadOrder(command.getOrderId());
// 执行业务逻辑
OrderItemAddedEvent event = order.addItem(
command.getProductId(),
command.getQuantity(),
command.getUnitPrice()
);
// 保存事件
eventStore.saveOrder(order, Collections.singletonList(event));
// 发布事件
eventBus.publish(event);
}
}
// 查询处理器
@Component
public class OrderQueryHandler {
private final OrderReadRepository readRepository;
public OrderDto getOrder(String orderId) {
return readRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
}
public List<OrderSummaryDto> getOrdersByCustomer(String customerId) {
return readRepository.findByCustomerId(customerId);
}
}
// 读模型更新器
@Component
public class OrderReadModelUpdater {
private final OrderReadRepository readRepository;
@EventHandler
public void on(OrderCreatedEvent event) {
OrderReadModel model = new OrderReadModel();
model.setId(event.getAggregateId());
model.setCustomerId(event.getCustomerId());
model.setStatus("CREATED");
model.setTotalAmount(event.getTotalAmount());
model.setItemCount(event.getItems().size());
model.setCreatedAt(event.getOccurredAt());
readRepository.save(model);
}
@EventHandler
public void on(OrderItemAddedEvent event) {
OrderReadModel model = readRepository.findById(event.getAggregateId())
.orElseThrow(() -> new OrderNotFoundException(event.getAggregateId()));
model.setItemCount(model.getItemCount() + 1);
model.setTotalAmount(model.getTotalAmount().add(
BigDecimal.valueOf(event.getQuantity() * event.getUnitPrice())
));
readRepository.save(model);
}
@EventHandler
public void on(OrderConfirmedEvent event) {
OrderReadModel model = readRepository.findById(event.getAggregateId())
.orElseThrow(() -> new OrderNotFoundException(event.getAggregateId()));
model.setStatus("CONFIRMED");
model.setConfirmedAt(event.getOccurredAt());
readRepository.save(model);
}
}
事件溯源的优点
- 完整审计日志:记录所有状态变化,便于审计和合规
- 时间旅行:可以重建任何历史时间点的状态
- 调试能力:可以重放事件来重现问题
- 解耦读写:与CQRS结合实现读写分离
- 事件驱动:天然支持事件驱动架构
实践建议
- 事件设计:事件应该包含足够的信息,但不暴露实现细节
- 版本管理:使用事件版本管理来处理事件结构变化
- 快照策略:根据聚合大小和访问频率制定快照策略
- 事件存储选择:根据需求选择合适的事件存储(PostgreSQL、MongoDB、EventStoreDB)
- 监控告警:监控事件存储的性能和状态