← 返回首页
📨

Kafka集成:事件驱动架构

📂 java ⏱ 2 min 272 words

Kafka集成:事件驱动架构

概述

Kafka是分布式流处理平台。本教程介绍Java与Kafka的集成,构建事件驱动架构。

1. Spring Kafka

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

@Component
public class KafkaConsumer {
    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void receiveMessage(String message) {
        System.out.println("收到消息: " + message);
    }
}

2. 配置

# application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3. 实际应用示例

订单事件

// 事件定义
public class OrderCreatedEvent {
    private String orderId;
    private String userId;
    private List<OrderItem> items;
    private double totalAmount;
    private LocalDateTime timestamp;
    
    // 构造方法和getter/setter
}

// 生产者
@Service
public class OrderEventPublisher {
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    public void publishOrderCreated(OrderCreatedEvent event) {
        kafkaTemplate.send("order-events", event.getOrderId(), event);
    }
}

// 消费者
@Component
public class OrderEventHandler {
    @KafkaListener(topics = "order-events", groupId = "order-handler")
    public void handleOrderCreated(OrderCreatedEvent event) {
        System.out.println("处理订单创建事件: " + event.getOrderId());
        
        // 1. 扣减库存
        inventoryService.deduct(event.getItems());
        
        // 2. 扣减余额
        accountService.deduct(event.getUserId(), event.getTotalAmount());
        
        // 3. 更新订单状态
        orderService.updateStatus(event.getOrder(), OrderStatus.PAID);
    }
}

用户行为追踪

@Component
public class UserBehaviorTracker {
    private final KafkaTemplate<String, Object> kafkaTemplate;
    
    public void trackPageView(String userId, String page) {
        Map<String, Object> event = new HashMap<>();
        event.put("userId", userId);
        event.put("page", page);
        event.put("timestamp", LocalDateTime.now());
        
        kafkaTemplate.send("user-behavior", userId, event);
    }
    
    public void trackClick(String userId, String element) {
        Map<String, Object> event = new HashMap<>();
        event.put("userId", userId);
        event.put("element", element);
        event.put("timestamp", LocalDateTime.now());
        
        kafkaTemplate.send("user-behavior", userId, event);
    }
}

@Component
public class UserBehaviorAnalyzer {
    @KafkaListener(topics = "user-behavior", groupId = "behavior-analyzer")
    public void analyzeBehavior(Map<String, Object> event) {
        String userId = (String) event.get("userId");
        String page = (String) event.get("page");
        
        // 分析用户行为
        analyticsService.trackPageView(userId, page);
    }
}

4. 最佳实践

  1. 设计合理的Topic:根据业务需求设计Topic
  2. 消息分区:合理设置分区数量
  3. 消费者组:使用消费者组实现负载均衡
  4. 消息持久化:确保消息的持久化存储
  5. 监控和告警:监控Kafka集群状态

总结

Kafka是分布式流处理平台。掌握Java与Kafka的集成,可以构建高效的事件驱动架构。