← 返回首页
📨

消息队列详解:RocketMQ与Kafka

📂 java ⏱ 2 min 378 words

消息队列详解:RocketMQ与Kafka

概述

消息队列是分布式系统的重要组件。本教程介绍RocketMQ和Kafka的Java客户端使用。

1. RocketMQ

import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

// 生产者
@Component
public class RocketMQProducer {
    private final DefaultMQProducer producer;
    
    public RocketMQProducer() {
        this.producer = new DefaultMQProducer("producer-group");
        this.producer.setNamesrvAddr("localhost:9876");
    }
    
    public SendResult send(String topic, String message) {
        try {
            Message msg = new Message(topic, message.getBytes());
            return producer.send(msg);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

// 消费者
@Component
@RocketMQMessageListener(topic = "topic-name", consumerGroup = "consumer-group")
public class RocketMQConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        String body = new String(message.getBody());
        System.out.println("收到消息: " + body);
    }
}

2. Kafka

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.*;

// 生产者
public class KafkaProducer {
    private final KafkaProducer<String, String> producer;
    
    public KafkaProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producer = new KafkaProducer<>(props);
    }
    
    public void send(String topic, String key, String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
            } else {
                System.out.println("消息已发送: " + metadata.topic() + "-" + metadata.partition());
            }
        });
    }
}

// 消费者
public class KafkaConsumer {
    private final KafkaConsumer<String, String> consumer;
    
    public KafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "consumer-group");
        props.put("enable.auto.commit", "true");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<>(props);
    }
    
    public void consume(String topic) {
        consumer.subscribe(Collections.singletonList(topic));
        
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("收到消息: " + record.key() + " -> " + record.value());
            }
        }
    }
}

3. 实际应用示例

异步任务处理

@Component
public class AsyncTaskService {
    private final RocketMQProducer producer;
    
    public void submitTask(String taskData) {
        producer.send("async-task-topic", taskData);
    }
}

@Component
@RocketMQMessageListener(topic = "async-task-topic", consumerGroup = "task-consumer")
public class AsyncTaskConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        String taskData = new String(message.getBody());
        System.out.println("处理任务: " + taskData);
        // 执行任务处理逻辑
    }
}

事件驱动架构

@Component
public class EventPublisher {
    private final KafkaProducer<String, String> producer;
    
    public void publishEvent(String eventType, String eventData) {
        producer.send("events-topic", eventType, eventData);
    }
}

@Component
@RocketMQMessageListener(topic = "events-topic", consumerGroup = "event-consumer")
public class EventConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        String eventType = message.getKeys();
        String eventData = new String(message.getBody());
        System.out.println("处理事件: " + eventType + " -> " + eventData);
    }
}

4. 最佳实践

  1. 消息幂等性:确保消息被正确处理
  2. 消息顺序性:根据业务需求选择顺序或并行消费
  3. 消息持久化:确保消息不丢失
  4. 消费者负载均衡:合理配置消费者数量
  5. 监控消息队列:使用管理工具监控状态

总结

消息队列是分布式系统的重要组件。掌握RocketMQ和Kafka的使用,可以实现异步处理、事件驱动等架构模式。