Kafka集成:事件驱动架构
Kafka集成:事件驱动架构
概述
Kafka是分布式流处理平台。本教程介绍Java与Kafka的集成,构建事件驱动架构。
1. Spring Kafka
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
@Component
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void receiveMessage(String message) {
System.out.println("收到消息: " + message);
}
}
2. 配置
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3. 实际应用示例
订单事件
// 事件定义
public class OrderCreatedEvent {
private String orderId;
private String userId;
private List<OrderItem> items;
private double totalAmount;
private LocalDateTime timestamp;
// 构造方法和getter/setter
}
// 生产者
@Service
public class OrderEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void publishOrderCreated(OrderCreatedEvent event) {
kafkaTemplate.send("order-events", event.getOrderId(), event);
}
}
// 消费者
@Component
public class OrderEventHandler {
@KafkaListener(topics = "order-events", groupId = "order-handler")
public void handleOrderCreated(OrderCreatedEvent event) {
System.out.println("处理订单创建事件: " + event.getOrderId());
// 1. 扣减库存
inventoryService.deduct(event.getItems());
// 2. 扣减余额
accountService.deduct(event.getUserId(), event.getTotalAmount());
// 3. 更新订单状态
orderService.updateStatus(event.getOrder(), OrderStatus.PAID);
}
}
用户行为追踪
@Component
public class UserBehaviorTracker {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void trackPageView(String userId, String page) {
Map<String, Object> event = new HashMap<>();
event.put("userId", userId);
event.put("page", page);
event.put("timestamp", LocalDateTime.now());
kafkaTemplate.send("user-behavior", userId, event);
}
public void trackClick(String userId, String element) {
Map<String, Object> event = new HashMap<>();
event.put("userId", userId);
event.put("element", element);
event.put("timestamp", LocalDateTime.now());
kafkaTemplate.send("user-behavior", userId, event);
}
}
@Component
public class UserBehaviorAnalyzer {
@KafkaListener(topics = "user-behavior", groupId = "behavior-analyzer")
public void analyzeBehavior(Map<String, Object> event) {
String userId = (String) event.get("userId");
String page = (String) event.get("page");
// 分析用户行为
analyticsService.trackPageView(userId, page);
}
}
4. 最佳实践
- 设计合理的Topic:根据业务需求设计Topic
- 消息分区:合理设置分区数量
- 消费者组:使用消费者组实现负载均衡
- 消息持久化:确保消息的持久化存储
- 监控和告警:监控Kafka集群状态
总结
Kafka是分布式流处理平台。掌握Java与Kafka的集成,可以构建高效的事件驱动架构。