← 返回首页
🏗️

事件驱动架构:CQRS与事件溯源

📂 architecture ⏱ 6 min 1023 words

事件驱动架构:CQRS与事件溯源

事件驱动架构概述

事件驱动架构(Event-Driven Architecture,EDA)是一种通过事件的产生、检测和消费来驱动系统行为的架构模式。在这种架构中,组件通过异步事件进行通信,而不是直接调用。

核心概念

事件(Event)

事件是系统中发生的重要事情的不可变记录。事件包含发生了什么,但不包含应该如何处理。

// 事件基类
public abstract class DomainEvent {
    private final String eventId;
    private final LocalDateTime occurredAt;
    private final String eventType;
    
    protected DomainEvent() {
        this.eventId = UUID.randomUUID().toString();
        this.occurredAt = LocalDateTime.now();
        this.eventType = this.getClass().getSimpleName();
    }
    
    // getters
}

// 具体事件
public class OrderCreatedEvent extends DomainEvent {
    private final String orderId;
    private final String customerId;
    private final List<OrderItemEvent> items;
    private final BigDecimal totalAmount;
    
    public OrderCreatedEvent(String orderId, String customerId, 
                            List<OrderItemEvent> items, BigDecimal totalAmount) {
        super();
        this.orderId = orderId;
        this.customerId = customerId;
        this.items = items;
        this.totalAmount = totalAmount;
    }
    
    // getters
}

public class OrderConfirmedEvent extends DomainEvent {
    private final String orderId;
    private final String confirmedBy;
    
    public OrderConfirmedEvent(String orderId, String confirmedBy) {
        super();
        this.orderId = orderId;
        this.confirmedBy = confirmedBy;
    }
}

事件处理器(Event Handler)

事件处理器负责响应事件并执行相应的业务逻辑。

// 事件处理器接口
public interface EventHandler<T extends DomainEvent> {
    void handle(T event);
    Class<T> getEventType();
}

// 订单创建事件处理器
@Component
public class OrderCreatedEventHandler implements EventHandler<OrderCreatedEvent> {
    
    private final InventoryService inventoryService;
    private final NotificationService notificationService;
    private final AnalyticsService analyticsService;
    
    @Override
    public void handle(OrderCreatedEvent event) {
        // 1. 更新库存
        inventoryService.reserveItems(event.getItems());
        
        // 2. 发送通知
        notificationService.sendOrderCreatedNotification(event);
        
        // 3. 记录分析数据
        analyticsService.trackOrderCreated(event);
    }
    
    @Override
    public Class<OrderCreatedEvent> getEventType() {
        return OrderCreatedEvent.class;
    }
}

// 订单确认事件处理器
@Component
public class OrderConfirmedEventHandler implements EventHandler<OrderConfirmedEvent> {
    
    private final PaymentService paymentService;
    private final ShippingService shippingService;
    
    @Override
    public void handle(OrderConfirmedEvent event) {
        // 1. 处理支付
        paymentService.processPayment(event.getOrderId());
        
        // 2. 创建发货单
        shippingService.createShipment(event.getOrderId());
    }
    
    @Override
    public Class<OrderConfirmedEvent> getEventType() {
        return OrderConfirmedEvent.class;
    }
}

事件总线(Event Bus)

事件总线负责事件的发布和订阅管理。

// 事件总线接口
public interface EventBus {
    void publish(DomainEvent event);
    <T extends DomainEvent> void subscribe(Class<T> eventType, EventHandler<T> handler);
}

// 内存事件总线实现
@Component
public class InMemoryEventBus implements EventBus {
    
    private final Map<Class<? extends DomainEvent>, List<EventHandler<?>>> handlers = new ConcurrentHashMap<>();
    
    @Override
    public void publish(DomainEvent event) {
        List<EventHandler<?>> eventHandlers = handlers.get(event.getClass());
        if (eventHandlers != null) {
            for (EventHandler<?> handler : eventHandlers) {
                try {
                    ((EventHandler) handler).handle(event);
                } catch (Exception e) {
                    // 记录日志,但不中断其他处理器
                    log.error("Error handling event: {}", event.getEventId(), e);
                }
            }
        }
    }
    
    @Override
    public <T extends DomainEvent> void subscribe(Class<T> eventType, EventHandler<T> handler) {
        handlers.computeIfAbsent(eventType, k -> new ArrayList<>()).add(handler);
    }
}

// 基于消息队列的事件总线
@Component
public class KafkaEventBus implements EventBus {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final ObjectMapper objectMapper;
    
    @Override
    public void publish(DomainEvent event) {
        try {
            String json = objectMapper.writeValueAsString(event);
            kafkaTemplate.send("domain-events", event.getEventId(), json);
        } catch (JsonProcessingException e) {
            throw new EventPublishException("Failed to publish event", e);
        }
    }
}

CQRS(命令查询职责分离)

CQRS将读操作和写操作分离到不同的模型中,允许针对读写进行独立优化。

命令模型(Write Model)

// 命令
public abstract class Command {
    private final String commandId;
    private final LocalDateTime issuedAt;
    
    protected Command() {
        this.commandId = UUID.randomUUID().toString();
        this.issuedAt = LocalDateTime.now();
    }
}

public class CreateOrderCommand extends Command {
    private final String customerId;
    private final List<OrderItemCommand> items;
    
    public CreateOrderCommand(String customerId, List<OrderItemCommand> items) {
        super();
        this.customerId = customerId;
        this.items = items;
    }
}

// 命令处理器
@Component
public class CreateOrderCommandHandler {
    
    private final OrderRepository orderRepository;
    private final EventBus eventBus;
    
    @Transactional
    public OrderId handle(CreateOrderCommand command) {
        // 1. 创建订单聚合
        Order order = Order.create(command.getCustomerId(), command.getItems());
        
        // 2. 持久化
        orderRepository.save(order);
        
        // 3. 发布事件
        eventBus.publish(new OrderCreatedEvent(
            order.getId(),
            order.getCustomerId(),
            order.getItems(),
            order.getTotalAmount()
        ));
        
        return order.getId();
    }
}

// 聚合根
public class Order {
    private OrderId id;
    private CustomerId customerId;
    private List<OrderItem> items;
    private OrderStatus status;
    private Money totalAmount;
    private List<DomainEvent> uncommittedEvents = new ArrayList<>();
    
    public static Order create(String customerId, List<OrderItemCommand> items) {
        Order order = new Order();
        order.id = OrderId.generate();
        order.customerId = new CustomerId(customerId);
        order.items = items.stream()
                .map(item -> OrderItem.create(item.getProductId(), item.getQuantity()))
                .collect(Collectors.toList());
        order.status = OrderStatus.CREATED;
        order.recalculateTotal();
        
        // 记录事件
        order.uncommittedEvents.add(new OrderCreatedEvent(
            order.id.getValue(),
            order.customerId.getValue(),
            order.getItems(),
            order.totalAmount
        ));
        
        return order;
    }
    
    public void confirm() {
        if (status != OrderStatus.CREATED) {
            throw new BusinessException("Only created orders can be confirmed");
        }
        this.status = OrderStatus.CONFIRMED;
        
        uncommittedEvents.add(new OrderConfirmedEvent(
            id.getValue(),
            "system"
        ));
    }
    
    public List<DomainEvent> getUncommittedEvents() {
        return uncommittedEvents;
    }
    
    public void markEventsAsCommitted() {
        uncommittedEvents.clear();
    }
}

查询模型(Read Model)

// 查询DTO
public class OrderSummaryDto {
    private String orderId;
    private String customerName;
    private String status;
    private BigDecimal totalAmount;
    private LocalDateTime createdAt;
    private int itemCount;
}

// 查询服务
@Service
public class OrderQueryService {
    
    private final OrderSummaryRepository orderSummaryRepository;
    
    public OrderSummaryDto getOrderSummary(String orderId) {
        return orderSummaryRepository.findById(orderId)
                .orElseThrow(() -> new OrderNotFoundException(orderId));
    }
    
    public List<OrderSummaryDto> getOrdersByCustomer(String customerId) {
        return orderSummaryRepository.findByCustomerId(customerId);
    }
}

// 读模型Repository
@Repository
public class OrderSummaryRepository {
    
    private final JdbcTemplate jdbcTemplate;
    
    public Optional<OrderSummaryDto> findById(String orderId) {
        String sql = "SELECT * FROM order_summary WHERE order_id = ?";
        return jdbcTemplate.queryForObject(sql, new Object[]{orderId}, 
            (rs, rowNum) -> mapToDto(rs));
    }
    
    private OrderSummaryDto mapToDto(ResultSet rs) throws SQLException {
        OrderSummaryDto dto = new OrderSummaryDto();
        dto.setOrderId(rs.getString("order_id"));
        dto.setCustomerName(rs.getString("customer_name"));
        dto.setStatus(rs.getString("status"));
        dto.setTotalAmount(rs.getBigDecimal("total_amount"));
        dto.setCreatedAt(rs.getTimestamp("created_at").toLocalDateTime());
        dto.setItemCount(rs.getInt("item_count"));
        return dto;
    }
}

读模型更新

// 事件处理器更新读模型
@Component
public class OrderReadModelUpdater {
    
    private final OrderSummaryRepository orderSummaryRepository;
    
    @EventHandler
    public void on(OrderCreatedEvent event) {
        OrderSummaryDto summary = new OrderSummaryDto();
        summary.setOrderId(event.getOrderId());
        summary.setCustomerId(event.getCustomerId());
        summary.setStatus("CREATED");
        summary.setTotalAmount(event.getTotalAmount());
        summary.setItemCount(event.getItems().size());
        summary.setCreatedAt(LocalDateTime.now());
        
        orderSummaryRepository.save(summary);
    }
    
    @EventHandler
    public void on(OrderConfirmedEvent event) {
        OrderSummaryDto summary = orderSummaryRepository.findById(event.getOrderId())
                .orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));
        summary.setStatus("CONFIRMED");
        orderSummaryRepository.save(summary);
    }
}

事件溯源(Event Sourcing)

事件溯源将对象的状态存储为事件序列,而不是当前状态。通过重放事件可以重建任何时间点的状态。

// 事件存储接口
public interface EventStore {
    void appendEvents(String aggregateId, List<DomainEvent> events);
    List<DomainEvent> getEvents(String aggregateId);
    List<DomainEvent> getEvents(String aggregateId, long fromVersion);
}

// JPA事件存储实现
@Repository
public class JpaEventStore implements EventStore {
    
    private final EventJpaRepository eventRepository;
    private final ObjectMapper objectMapper;
    
    @Override
    public void appendEvents(String aggregateId, List<DomainEvent> events) {
        for (DomainEvent event : events) {
            EventJpaEntity entity = new EventJpaEntity();
            entity.setAggregateId(aggregateId);
            entity.setEventType(event.getClass().getSimpleName());
            entity.setEventData(objectMapper.writeValueAsString(event));
            entity.setVersion(getNextVersion(aggregateId));
            entity.setOccurredAt(event.getOccurredAt());
            
            eventRepository.save(entity);
        }
    }
    
    @Override
    public List<DomainEvent> getEvents(String aggregateId) {
        return eventRepository.findByAggregateIdOrderByVersionAsc(aggregateId)
                .stream()
                .map(this::toDomainEvent)
                .collect(Collectors.toList());
    }
    
    private DomainEvent toDomainEvent(EventJpaEntity entity) {
        try {
            Class<?> eventType = Class.forName("com.example.events." + entity.getEventType());
            return (DomainEvent) objectMapper.readValue(entity.getEventData(), eventType);
        } catch (Exception e) {
            throw new EventDeserializationException("Failed to deserialize event", e);
        }
    }
}

// 事件溯源聚合根
public class EventSourcedOrder {
    private String id;
    private List<DomainEvent> appliedEvents = new ArrayList<>();
    private int version = 0;
    
    // 从事件重建状态
    public static EventSourcedOrder fromEvents(String id, List<DomainEvent> events) {
        EventSourcedOrder order = new EventSourcedOrder();
        order.id = id;
        
        for (DomainEvent event : events) {
            order.apply(event, false);
        }
        
        return order;
    }
    
    // 应用事件
    public void apply(DomainEvent event, boolean isNew) {
        if (isNew) {
            appliedEvents.add(event);
        }
        
        // 根据事件类型更新状态
        if (event instanceof OrderCreatedEvent) {
            handleOrderCreated((OrderCreatedEvent) event);
        } else if (event instanceof OrderConfirmedEvent) {
            handleOrderConfirmed((OrderConfirmedEvent) event);
        }
        
        version++;
    }
    
    private void handleOrderCreated(OrderCreatedEvent event) {
        this.customerId = event.getCustomerId();
        this.items = event.getItems();
        this.status = "CREATED";
        this.totalAmount = event.getTotalAmount();
    }
    
    private void handleOrderConfirmed(OrderConfirmedEvent event) {
        this.status = "CONFIRMED";
    }
    
    // 获取未提交的事件
    public List<DomainEvent> getUncommittedEvents() {
        return appliedEvents;
    }
    
    public void markEventsAsCommitted() {
        appliedEvents.clear();
    }
}

事件驱动架构的优点

  1. 松耦合:组件通过事件通信,不直接依赖
  2. 可扩展性:可以轻松添加新的事件处理器
  3. 可审计性:事件提供了完整的业务活动记录
  4. 异步处理:支持异步处理,提高系统响应性
  5. 最终一致性:适合需要最终一致性的场景

实施建议

  1. 定义清晰的事件契约:事件应该包含足够的信息,但不暴露实现细节
  2. 使用事件版本管理:随着系统演化,事件结构可能需要变化
  3. 实现幂等性:事件处理器应该能够处理重复事件
  4. 监控事件流:建立监控和告警机制,确保事件处理正常
  5. 考虑事件风暴:使用事件风暴工作坊来识别领域事件