消息队列详解:RocketMQ与Kafka
消息队列详解:RocketMQ与Kafka
概述
消息队列是分布式系统的重要组件。本教程介绍RocketMQ和Kafka的Java客户端使用。
1. RocketMQ
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
// 生产者
@Component
public class RocketMQProducer {
private final DefaultMQProducer producer;
public RocketMQProducer() {
this.producer = new DefaultMQProducer("producer-group");
this.producer.setNamesrvAddr("localhost:9876");
}
public SendResult send(String topic, String message) {
try {
Message msg = new Message(topic, message.getBytes());
return producer.send(msg);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
// 消费者
@Component
@RocketMQMessageListener(topic = "topic-name", consumerGroup = "consumer-group")
public class RocketMQConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String body = new String(message.getBody());
System.out.println("收到消息: " + body);
}
}
2. Kafka
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import java.util.*;
// 生产者
public class KafkaProducer {
private final KafkaProducer<String, String> producer;
public KafkaProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer<>(props);
}
public void send(String topic, String key, String value) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("消息已发送: " + metadata.topic() + "-" + metadata.partition());
}
});
}
}
// 消费者
public class KafkaConsumer {
private final KafkaConsumer<String, String> consumer;
public KafkaConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-group");
props.put("enable.auto.commit", "true");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
}
public void consume(String topic) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("收到消息: " + record.key() + " -> " + record.value());
}
}
}
}
3. 实际应用示例
异步任务处理
@Component
public class AsyncTaskService {
private final RocketMQProducer producer;
public void submitTask(String taskData) {
producer.send("async-task-topic", taskData);
}
}
@Component
@RocketMQMessageListener(topic = "async-task-topic", consumerGroup = "task-consumer")
public class AsyncTaskConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String taskData = new String(message.getBody());
System.out.println("处理任务: " + taskData);
// 执行任务处理逻辑
}
}
事件驱动架构
@Component
public class EventPublisher {
private final KafkaProducer<String, String> producer;
public void publishEvent(String eventType, String eventData) {
producer.send("events-topic", eventType, eventData);
}
}
@Component
@RocketMQMessageListener(topic = "events-topic", consumerGroup = "event-consumer")
public class EventConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String eventType = message.getKeys();
String eventData = new String(message.getBody());
System.out.println("处理事件: " + eventType + " -> " + eventData);
}
}
4. 最佳实践
- 消息幂等性:确保消息被正确处理
- 消息顺序性:根据业务需求选择顺序或并行消费
- 消息持久化:确保消息不丢失
- 消费者负载均衡:合理配置消费者数量
- 监控消息队列:使用管理工具监控状态
总结
消息队列是分布式系统的重要组件。掌握RocketMQ和Kafka的使用,可以实现异步处理、事件驱动等架构模式。