DDD集成模式
DDD集成模式
事件驱动集成
事件驱动是微服务间集成的首选方式,通过异步事件实现松耦合。
// 事件发布
@Component
public class OrderEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
public void publishOrderCreated(Order order) {
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderId(order.getId().getValue())
.orderNo(order.getOrderNo())
.customerId(order.getCustomerId().getValue())
.items(order.getItems().stream()
.map(item -> new OrderItemEvent(
item.getProductId().getValue(),
item.getQuantity(),
item.getPrice()
))
.collect(Collectors.toList()))
.occurredAt(LocalDateTime.now())
.build();
try {
String payload = objectMapper.writeValueAsString(event);
kafkaTemplate.send("order-events", order.getId().getValue(), payload);
} catch (JsonProcessingException e) {
throw new EventPublishException("发布订单创建事件失败", e);
}
}
}
// 事件消费
@Component
public class InventoryEventConsumer {
private final InventoryService inventoryService;
@KafkaListener(topics = "order-events", groupId = "inventory-service")
public void handleOrderEvents(String message) {
try {
OrderCreatedEvent event = objectMapper.readValue(
message, OrderCreatedEvent.class);
handleOrderCreated(event);
} catch (Exception e) {
log.error("处理订单事件失败", e);
// 记录失败事件,后续重试
failedEventRepository.save(new FailedEvent(message, e.getMessage()));
}
}
private void handleOrderCreated(OrderCreatedEvent event) {
// 幂等检查
if (inventoryRepository.existsByOrderId(event.getOrderId())) {
log.warn("订单 {} 的库存已处理,跳过", event.getOrderId());
return;
}
// 检查并预留库存
for (OrderItemEvent item : event.getItems()) {
boolean available = inventoryService.checkAndReserve(
item.getProductId(), item.getQuantity());
if (!available) {
// 发布库存不足事件
eventPublisher.publish(new OrderInventoryInsufficientEvent(
event.getOrderId(), item.getProductId()));
return;
}
}
// 发布库存预留成功事件
eventPublisher.publish(new OrderInventoryReservedEvent(event.getOrderId()));
}
}
API组合模式
API组合用于跨服务查询聚合数据。
// API 组合器
@Component
public class OrderQueryComposer {
private final OrderClient orderClient;
private final CustomerClient customerClient;
private final ProductClient productClient;
private final PaymentClient paymentClient;
@Cacheable(value = "orderDetail", key = "#orderId")
public OrderDetail composeOrderDetail(String orderId) {
// 并行调用多个服务
CompletableFuture<OrderDto> orderFuture = CompletableFuture.supplyAsync(
() -> orderClient.getOrder(orderId));
OrderDto order = orderFuture.join();
// 获取关联数据
CustomerDto customer = customerClient.getCustomer(order.getCustomerId());
ProductDto product = productClient.getProduct(order.getProductId());
PaymentDto payment = paymentClient.getPaymentByOrder(orderId);
// 组装读模型
return OrderDetail.builder()
.orderId(order.getId())
.orderNo(order.getOrderNo())
.customerName(customer.getName())
.productName(product.getName())
.amount(order.getAmount())
.paymentStatus(payment.getStatus())
.createdAt(order.getCreatedAt())
.build();
}
}
// API 组合的容错处理
@Component
public class ResilientApiClient {
private final CircuitBreaker circuitBreaker;
private final Retry retry;
public <T> T executeWithFallback(String service, Supplier<T> call, Supplier<T> fallback) {
Supplier<T> decorated = Decorators.ofSupplier(call)
.withCircuitBreaker(circuitBreaker)
.withRetry(retry)
.withFallback(CallNotPermittedException.class, e -> fallback.get())
.withFallback(TimeoutException.class, e -> fallback.get())
.decorate();
return Try.ofSupplier(decorated).recover(e -> fallback.get()).get();
}
}
防腐层集成
// 外部系统防腐层
@Component
public class LegacySystemACL {
private final LegacyApiClient legacyClient;
private final ObjectMapper objectMapper;
public Customer toCustomer(LegacyUserDto legacyUser) {
return Customer.builder()
.id(new CustomerId(legacyUser.getUSER_ID()))
.name(new UserName(
legacyUser.getFIRST_NAME(),
legacyUser.getLAST_NAME()))
.email(new Email(legacyUser.getEMAIL_ADDR()))
.phone(new Phone(legacyUser.getPHONE_NO()))
.build();
}
public LegacyOrderDto toLegacyOrder(CreateOrderCommand command) {
LegacyOrderDto dto = new LegacyOrderDto();
dto.setCUST_ID(command.getCustomerId().getValue());
dto.setORDER_ITEMS(command.getItems().stream()
.map(item -> {
LegacyOrderItemDto itemDto = new LegacyOrderItemDto();
itemDto.setPROD_ID(item.getProductId().getValue());
itemDto.setQTY(String.valueOf(item.getQuantity()));
return itemDto;
})
.collect(Collectors.toList()));
return dto;
}
public Order placeOrder(CreateOrderCommand command) {
LegacyOrderDto legacyOrder = toLegacyOrder(command);
LegacyOrderResponse response = legacyClient.createOrder(legacyOrder);
return toOrder(response);
}
}
集成模式选择
| 模式 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| 事件驱动 | 异步通信、最终一致性 | 松耦合、可扩展 | 调试复杂 |
| API组合 | 跨服务查询 | 实时性强 | 延迟较高 |
| 防腐层 | 集成遗留系统 | 隔离外部依赖 | 增加复杂度 |
选择合适的集成模式需要根据业务需求和技术约束综合考虑。