← 返回首页

Kafka深入解析

📂 java ⏱ 2 min 203 words

Kafka深入解析

Apache Kafka是一个分布式事件流平台,以高吞吐、低延迟著称,广泛应用于日志收集、消息传递和流处理场景。

Kafka核心架构

Topic与Partition

Kafka通过Topic组织消息,每个Topic可有多个Partition实现并行:

// 创建Topic配置
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient admin = AdminClient.create(props);

NewTopic topic = new NewTopic("user-events", 6, (short) 3);
admin.createTopics(Collections.singletonList(topic));

Producer生产者

@Bean
public Producer<String, String> kafkaProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, 3);
    return new KafkaProducer<>(props);
}

// 发送消息
ProducerRecord<String, String> record =
    new ProducerRecord<>("user-events", userId, jsonPayload);
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        log.error("发送失败", exception);
    } else {
        log.info("消息已发送到 partition={} offset={}",
            metadata.partition(), metadata.offset());
    }
});

Consumer消费者

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    return factory;
}

@Component
public class UserEventHandler {
    @KafkaListener(topics = "user-events", groupId = "analytics-group")
    public void handleEvent(ConsumerRecord<String, String> record,
                            Acknowledgment ack) {
        UserEvent event = objectMapper.readValue(record.value(), UserEvent.class);
        processEvent(event);
        ack.acknowledge();
    }
}

Spring Boot集成

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      enable-auto-commit: false
    producer:
      acks: all
      retries: 3

Kafka高级特性

Consumer Group与Rebalance

// 自定义Rebalance监听器
@Bean
public ConsumerRebalanceListener<String, String> rebalanceListener() {
    return new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            // 提交偏移量,清理状态
            log.info("分区被回收: {}", partitions);
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            log.info("新分区分配: {}", partitions);
        }
    };
}

幂等生产者

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-" + UUID.randomUUID());

小结

Kafka凭借高吞吐和水平扩展能力,成为大数据和实时流处理的首选中间件。