Kafka深入解析
Kafka深入解析
Apache Kafka是一个分布式事件流平台,以高吞吐、低延迟著称,广泛应用于日志收集、消息传递和流处理场景。
Kafka核心架构
Topic与Partition
Kafka通过Topic组织消息,每个Topic可有多个Partition实现并行:
// 创建Topic配置
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);
NewTopic topic = new NewTopic("user-events", 6, (short) 3);
admin.createTopics(Collections.singletonList(topic));
Producer生产者
@Bean
public Producer<String, String> kafkaProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
return new KafkaProducer<>(props);
}
// 发送消息
ProducerRecord<String, String> record =
new ProducerRecord<>("user-events", userId, jsonPayload);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("发送失败", exception);
} else {
log.info("消息已发送到 partition={} offset={}",
metadata.partition(), metadata.offset());
}
});
Consumer消费者
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
@Component
public class UserEventHandler {
@KafkaListener(topics = "user-events", groupId = "analytics-group")
public void handleEvent(ConsumerRecord<String, String> record,
Acknowledgment ack) {
UserEvent event = objectMapper.readValue(record.value(), UserEvent.class);
processEvent(event);
ack.acknowledge();
}
}
Spring Boot集成
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: my-group
auto-offset-reset: earliest
enable-auto-commit: false
producer:
acks: all
retries: 3
Kafka高级特性
Consumer Group与Rebalance
// 自定义Rebalance监听器
@Bean
public ConsumerRebalanceListener<String, String> rebalanceListener() {
return new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 提交偏移量,清理状态
log.info("分区被回收: {}", partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.info("新分区分配: {}", partitions);
}
};
}
幂等生产者
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-" + UUID.randomUUID());
小结
Kafka凭借高吞吐和水平扩展能力,成为大数据和实时流处理的首选中间件。