← 返回首页
🎯

DDD集成模式

📂 architecture ⏱ 2 min 376 words

DDD集成模式

事件驱动集成

事件驱动是微服务间集成的首选方式,通过异步事件实现松耦合。

// 事件发布
@Component
public class OrderEventPublisher {
    
    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final ObjectMapper objectMapper;
    
    public void publishOrderCreated(Order order) {
        OrderCreatedEvent event = OrderCreatedEvent.builder()
            .orderId(order.getId().getValue())
            .orderNo(order.getOrderNo())
            .customerId(order.getCustomerId().getValue())
            .items(order.getItems().stream()
                .map(item -> new OrderItemEvent(
                    item.getProductId().getValue(),
                    item.getQuantity(),
                    item.getPrice()
                ))
                .collect(Collectors.toList()))
            .occurredAt(LocalDateTime.now())
            .build();
        
        try {
            String payload = objectMapper.writeValueAsString(event);
            kafkaTemplate.send("order-events", order.getId().getValue(), payload);
        } catch (JsonProcessingException e) {
            throw new EventPublishException("发布订单创建事件失败", e);
        }
    }
}

// 事件消费
@Component
public class InventoryEventConsumer {
    
    private final InventoryService inventoryService;
    
    @KafkaListener(topics = "order-events", groupId = "inventory-service")
    public void handleOrderEvents(String message) {
        try {
            OrderCreatedEvent event = objectMapper.readValue(
                message, OrderCreatedEvent.class);
            
            handleOrderCreated(event);
        } catch (Exception e) {
            log.error("处理订单事件失败", e);
            // 记录失败事件,后续重试
            failedEventRepository.save(new FailedEvent(message, e.getMessage()));
        }
    }
    
    private void handleOrderCreated(OrderCreatedEvent event) {
        // 幂等检查
        if (inventoryRepository.existsByOrderId(event.getOrderId())) {
            log.warn("订单 {} 的库存已处理,跳过", event.getOrderId());
            return;
        }
        
        // 检查并预留库存
        for (OrderItemEvent item : event.getItems()) {
            boolean available = inventoryService.checkAndReserve(
                item.getProductId(), item.getQuantity());
            
            if (!available) {
                // 发布库存不足事件
                eventPublisher.publish(new OrderInventoryInsufficientEvent(
                    event.getOrderId(), item.getProductId()));
                return;
            }
        }
        
        // 发布库存预留成功事件
        eventPublisher.publish(new OrderInventoryReservedEvent(event.getOrderId()));
    }
}

API组合模式

API组合用于跨服务查询聚合数据。

// API 组合器
@Component
public class OrderQueryComposer {
    
    private final OrderClient orderClient;
    private final CustomerClient customerClient;
    private final ProductClient productClient;
    private final PaymentClient paymentClient;
    
    @Cacheable(value = "orderDetail", key = "#orderId")
    public OrderDetail composeOrderDetail(String orderId) {
        // 并行调用多个服务
        CompletableFuture<OrderDto> orderFuture = CompletableFuture.supplyAsync(
            () -> orderClient.getOrder(orderId));
        
        OrderDto order = orderFuture.join();
        
        // 获取关联数据
        CustomerDto customer = customerClient.getCustomer(order.getCustomerId());
        ProductDto product = productClient.getProduct(order.getProductId());
        PaymentDto payment = paymentClient.getPaymentByOrder(orderId);
        
        // 组装读模型
        return OrderDetail.builder()
            .orderId(order.getId())
            .orderNo(order.getOrderNo())
            .customerName(customer.getName())
            .productName(product.getName())
            .amount(order.getAmount())
            .paymentStatus(payment.getStatus())
            .createdAt(order.getCreatedAt())
            .build();
    }
}

// API 组合的容错处理
@Component
public class ResilientApiClient {
    
    private final CircuitBreaker circuitBreaker;
    private final Retry retry;
    
    public <T> T executeWithFallback(String service, Supplier<T> call, Supplier<T> fallback) {
        Supplier<T> decorated = Decorators.ofSupplier(call)
            .withCircuitBreaker(circuitBreaker)
            .withRetry(retry)
            .withFallback(CallNotPermittedException.class, e -> fallback.get())
            .withFallback(TimeoutException.class, e -> fallback.get())
            .decorate();
        
        return Try.ofSupplier(decorated).recover(e -> fallback.get()).get();
    }
}

防腐层集成

// 外部系统防腐层
@Component
public class LegacySystemACL {
    
    private final LegacyApiClient legacyClient;
    private final ObjectMapper objectMapper;
    
    public Customer toCustomer(LegacyUserDto legacyUser) {
        return Customer.builder()
            .id(new CustomerId(legacyUser.getUSER_ID()))
            .name(new UserName(
                legacyUser.getFIRST_NAME(),
                legacyUser.getLAST_NAME()))
            .email(new Email(legacyUser.getEMAIL_ADDR()))
            .phone(new Phone(legacyUser.getPHONE_NO()))
            .build();
    }
    
    public LegacyOrderDto toLegacyOrder(CreateOrderCommand command) {
        LegacyOrderDto dto = new LegacyOrderDto();
        dto.setCUST_ID(command.getCustomerId().getValue());
        dto.setORDER_ITEMS(command.getItems().stream()
            .map(item -> {
                LegacyOrderItemDto itemDto = new LegacyOrderItemDto();
                itemDto.setPROD_ID(item.getProductId().getValue());
                itemDto.setQTY(String.valueOf(item.getQuantity()));
                return itemDto;
            })
            .collect(Collectors.toList()));
        return dto;
    }
    
    public Order placeOrder(CreateOrderCommand command) {
        LegacyOrderDto legacyOrder = toLegacyOrder(command);
        LegacyOrderResponse response = legacyClient.createOrder(legacyOrder);
        return toOrder(response);
    }
}

集成模式选择

模式 适用场景 优点 缺点
事件驱动 异步通信、最终一致性 松耦合、可扩展 调试复杂
API组合 跨服务查询 实时性强 延迟较高
防腐层 集成遗留系统 隔离外部依赖 增加复杂度

选择合适的集成模式需要根据业务需求和技术约束综合考虑。