事件驱动架构:CQRS与事件溯源
事件驱动架构:CQRS与事件溯源
事件驱动架构概述
事件驱动架构(Event-Driven Architecture,EDA)是一种通过事件的产生、检测和消费来驱动系统行为的架构模式。在这种架构中,组件通过异步事件进行通信,而不是直接调用。
核心概念
事件(Event)
事件是系统中发生的重要事情的不可变记录。事件包含发生了什么,但不包含应该如何处理。
// 事件基类
public abstract class DomainEvent {
private final String eventId;
private final LocalDateTime occurredAt;
private final String eventType;
protected DomainEvent() {
this.eventId = UUID.randomUUID().toString();
this.occurredAt = LocalDateTime.now();
this.eventType = this.getClass().getSimpleName();
}
// getters
}
// 具体事件
public class OrderCreatedEvent extends DomainEvent {
private final String orderId;
private final String customerId;
private final List<OrderItemEvent> items;
private final BigDecimal totalAmount;
public OrderCreatedEvent(String orderId, String customerId,
List<OrderItemEvent> items, BigDecimal totalAmount) {
super();
this.orderId = orderId;
this.customerId = customerId;
this.items = items;
this.totalAmount = totalAmount;
}
// getters
}
public class OrderConfirmedEvent extends DomainEvent {
private final String orderId;
private final String confirmedBy;
public OrderConfirmedEvent(String orderId, String confirmedBy) {
super();
this.orderId = orderId;
this.confirmedBy = confirmedBy;
}
}
事件处理器(Event Handler)
事件处理器负责响应事件并执行相应的业务逻辑。
// 事件处理器接口
public interface EventHandler<T extends DomainEvent> {
void handle(T event);
Class<T> getEventType();
}
// 订单创建事件处理器
@Component
public class OrderCreatedEventHandler implements EventHandler<OrderCreatedEvent> {
private final InventoryService inventoryService;
private final NotificationService notificationService;
private final AnalyticsService analyticsService;
@Override
public void handle(OrderCreatedEvent event) {
// 1. 更新库存
inventoryService.reserveItems(event.getItems());
// 2. 发送通知
notificationService.sendOrderCreatedNotification(event);
// 3. 记录分析数据
analyticsService.trackOrderCreated(event);
}
@Override
public Class<OrderCreatedEvent> getEventType() {
return OrderCreatedEvent.class;
}
}
// 订单确认事件处理器
@Component
public class OrderConfirmedEventHandler implements EventHandler<OrderConfirmedEvent> {
private final PaymentService paymentService;
private final ShippingService shippingService;
@Override
public void handle(OrderConfirmedEvent event) {
// 1. 处理支付
paymentService.processPayment(event.getOrderId());
// 2. 创建发货单
shippingService.createShipment(event.getOrderId());
}
@Override
public Class<OrderConfirmedEvent> getEventType() {
return OrderConfirmedEvent.class;
}
}
事件总线(Event Bus)
事件总线负责事件的发布和订阅管理。
// 事件总线接口
public interface EventBus {
void publish(DomainEvent event);
<T extends DomainEvent> void subscribe(Class<T> eventType, EventHandler<T> handler);
}
// 内存事件总线实现
@Component
public class InMemoryEventBus implements EventBus {
private final Map<Class<? extends DomainEvent>, List<EventHandler<?>>> handlers = new ConcurrentHashMap<>();
@Override
public void publish(DomainEvent event) {
List<EventHandler<?>> eventHandlers = handlers.get(event.getClass());
if (eventHandlers != null) {
for (EventHandler<?> handler : eventHandlers) {
try {
((EventHandler) handler).handle(event);
} catch (Exception e) {
// 记录日志,但不中断其他处理器
log.error("Error handling event: {}", event.getEventId(), e);
}
}
}
}
@Override
public <T extends DomainEvent> void subscribe(Class<T> eventType, EventHandler<T> handler) {
handlers.computeIfAbsent(eventType, k -> new ArrayList<>()).add(handler);
}
}
// 基于消息队列的事件总线
@Component
public class KafkaEventBus implements EventBus {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
@Override
public void publish(DomainEvent event) {
try {
String json = objectMapper.writeValueAsString(event);
kafkaTemplate.send("domain-events", event.getEventId(), json);
} catch (JsonProcessingException e) {
throw new EventPublishException("Failed to publish event", e);
}
}
}
CQRS(命令查询职责分离)
CQRS将读操作和写操作分离到不同的模型中,允许针对读写进行独立优化。
命令模型(Write Model)
// 命令
public abstract class Command {
private final String commandId;
private final LocalDateTime issuedAt;
protected Command() {
this.commandId = UUID.randomUUID().toString();
this.issuedAt = LocalDateTime.now();
}
}
public class CreateOrderCommand extends Command {
private final String customerId;
private final List<OrderItemCommand> items;
public CreateOrderCommand(String customerId, List<OrderItemCommand> items) {
super();
this.customerId = customerId;
this.items = items;
}
}
// 命令处理器
@Component
public class CreateOrderCommandHandler {
private final OrderRepository orderRepository;
private final EventBus eventBus;
@Transactional
public OrderId handle(CreateOrderCommand command) {
// 1. 创建订单聚合
Order order = Order.create(command.getCustomerId(), command.getItems());
// 2. 持久化
orderRepository.save(order);
// 3. 发布事件
eventBus.publish(new OrderCreatedEvent(
order.getId(),
order.getCustomerId(),
order.getItems(),
order.getTotalAmount()
));
return order.getId();
}
}
// 聚合根
public class Order {
private OrderId id;
private CustomerId customerId;
private List<OrderItem> items;
private OrderStatus status;
private Money totalAmount;
private List<DomainEvent> uncommittedEvents = new ArrayList<>();
public static Order create(String customerId, List<OrderItemCommand> items) {
Order order = new Order();
order.id = OrderId.generate();
order.customerId = new CustomerId(customerId);
order.items = items.stream()
.map(item -> OrderItem.create(item.getProductId(), item.getQuantity()))
.collect(Collectors.toList());
order.status = OrderStatus.CREATED;
order.recalculateTotal();
// 记录事件
order.uncommittedEvents.add(new OrderCreatedEvent(
order.id.getValue(),
order.customerId.getValue(),
order.getItems(),
order.totalAmount
));
return order;
}
public void confirm() {
if (status != OrderStatus.CREATED) {
throw new BusinessException("Only created orders can be confirmed");
}
this.status = OrderStatus.CONFIRMED;
uncommittedEvents.add(new OrderConfirmedEvent(
id.getValue(),
"system"
));
}
public List<DomainEvent> getUncommittedEvents() {
return uncommittedEvents;
}
public void markEventsAsCommitted() {
uncommittedEvents.clear();
}
}
查询模型(Read Model)
// 查询DTO
public class OrderSummaryDto {
private String orderId;
private String customerName;
private String status;
private BigDecimal totalAmount;
private LocalDateTime createdAt;
private int itemCount;
}
// 查询服务
@Service
public class OrderQueryService {
private final OrderSummaryRepository orderSummaryRepository;
public OrderSummaryDto getOrderSummary(String orderId) {
return orderSummaryRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
}
public List<OrderSummaryDto> getOrdersByCustomer(String customerId) {
return orderSummaryRepository.findByCustomerId(customerId);
}
}
// 读模型Repository
@Repository
public class OrderSummaryRepository {
private final JdbcTemplate jdbcTemplate;
public Optional<OrderSummaryDto> findById(String orderId) {
String sql = "SELECT * FROM order_summary WHERE order_id = ?";
return jdbcTemplate.queryForObject(sql, new Object[]{orderId},
(rs, rowNum) -> mapToDto(rs));
}
private OrderSummaryDto mapToDto(ResultSet rs) throws SQLException {
OrderSummaryDto dto = new OrderSummaryDto();
dto.setOrderId(rs.getString("order_id"));
dto.setCustomerName(rs.getString("customer_name"));
dto.setStatus(rs.getString("status"));
dto.setTotalAmount(rs.getBigDecimal("total_amount"));
dto.setCreatedAt(rs.getTimestamp("created_at").toLocalDateTime());
dto.setItemCount(rs.getInt("item_count"));
return dto;
}
}
读模型更新
// 事件处理器更新读模型
@Component
public class OrderReadModelUpdater {
private final OrderSummaryRepository orderSummaryRepository;
@EventHandler
public void on(OrderCreatedEvent event) {
OrderSummaryDto summary = new OrderSummaryDto();
summary.setOrderId(event.getOrderId());
summary.setCustomerId(event.getCustomerId());
summary.setStatus("CREATED");
summary.setTotalAmount(event.getTotalAmount());
summary.setItemCount(event.getItems().size());
summary.setCreatedAt(LocalDateTime.now());
orderSummaryRepository.save(summary);
}
@EventHandler
public void on(OrderConfirmedEvent event) {
OrderSummaryDto summary = orderSummaryRepository.findById(event.getOrderId())
.orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));
summary.setStatus("CONFIRMED");
orderSummaryRepository.save(summary);
}
}
事件溯源(Event Sourcing)
事件溯源将对象的状态存储为事件序列,而不是当前状态。通过重放事件可以重建任何时间点的状态。
// 事件存储接口
public interface EventStore {
void appendEvents(String aggregateId, List<DomainEvent> events);
List<DomainEvent> getEvents(String aggregateId);
List<DomainEvent> getEvents(String aggregateId, long fromVersion);
}
// JPA事件存储实现
@Repository
public class JpaEventStore implements EventStore {
private final EventJpaRepository eventRepository;
private final ObjectMapper objectMapper;
@Override
public void appendEvents(String aggregateId, List<DomainEvent> events) {
for (DomainEvent event : events) {
EventJpaEntity entity = new EventJpaEntity();
entity.setAggregateId(aggregateId);
entity.setEventType(event.getClass().getSimpleName());
entity.setEventData(objectMapper.writeValueAsString(event));
entity.setVersion(getNextVersion(aggregateId));
entity.setOccurredAt(event.getOccurredAt());
eventRepository.save(entity);
}
}
@Override
public List<DomainEvent> getEvents(String aggregateId) {
return eventRepository.findByAggregateIdOrderByVersionAsc(aggregateId)
.stream()
.map(this::toDomainEvent)
.collect(Collectors.toList());
}
private DomainEvent toDomainEvent(EventJpaEntity entity) {
try {
Class<?> eventType = Class.forName("com.example.events." + entity.getEventType());
return (DomainEvent) objectMapper.readValue(entity.getEventData(), eventType);
} catch (Exception e) {
throw new EventDeserializationException("Failed to deserialize event", e);
}
}
}
// 事件溯源聚合根
public class EventSourcedOrder {
private String id;
private List<DomainEvent> appliedEvents = new ArrayList<>();
private int version = 0;
// 从事件重建状态
public static EventSourcedOrder fromEvents(String id, List<DomainEvent> events) {
EventSourcedOrder order = new EventSourcedOrder();
order.id = id;
for (DomainEvent event : events) {
order.apply(event, false);
}
return order;
}
// 应用事件
public void apply(DomainEvent event, boolean isNew) {
if (isNew) {
appliedEvents.add(event);
}
// 根据事件类型更新状态
if (event instanceof OrderCreatedEvent) {
handleOrderCreated((OrderCreatedEvent) event);
} else if (event instanceof OrderConfirmedEvent) {
handleOrderConfirmed((OrderConfirmedEvent) event);
}
version++;
}
private void handleOrderCreated(OrderCreatedEvent event) {
this.customerId = event.getCustomerId();
this.items = event.getItems();
this.status = "CREATED";
this.totalAmount = event.getTotalAmount();
}
private void handleOrderConfirmed(OrderConfirmedEvent event) {
this.status = "CONFIRMED";
}
// 获取未提交的事件
public List<DomainEvent> getUncommittedEvents() {
return appliedEvents;
}
public void markEventsAsCommitted() {
appliedEvents.clear();
}
}
事件驱动架构的优点
- 松耦合:组件通过事件通信,不直接依赖
- 可扩展性:可以轻松添加新的事件处理器
- 可审计性:事件提供了完整的业务活动记录
- 异步处理:支持异步处理,提高系统响应性
- 最终一致性:适合需要最终一致性的场景
实施建议
- 定义清晰的事件契约:事件应该包含足够的信息,但不暴露实现细节
- 使用事件版本管理:随着系统演化,事件结构可能需要变化
- 实现幂等性:事件处理器应该能够处理重复事件
- 监控事件流:建立监控和告警机制,确保事件处理正常
- 考虑事件风暴:使用事件风暴工作坊来识别领域事件