消息队列架构Kafka/RocketMQ/RabbitMQ
消息队列架构Kafka/RocketMQ/RabbitMQ
消息队列概述
消息队列(Message Queue,MQ)是一种异步通信机制,允许应用程序通过消息进行通信。消息队列可以解耦系统组件、提高系统可扩展性、处理高并发场景。
Kafka架构
核心概念
// Kafka生产者
@Component
public class KafkaMessageProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
public void sendMessageWithCallback(String topic, String message) {
kafkaTemplate.send(topic, message).addCallback(
result -> log.info("Message sent successfully: {}", result.getRecordMetadata()),
ex -> log.error("Failed to send message", ex)
);
}
// 发送带有Key的消息
public void sendMessageWithKey(String topic, String key, String message) {
kafkaTemplate.send(topic, key, message);
}
}
// Kafka消费者
@Component
public class KafkaMessageConsumer {
@KafkaListener(topics = "order-events", groupId = "order-service")
public void consumeOrderEvent(String message) {
log.info("Received order event: {}", message);
// 处理消息
OrderEvent event = objectMapper.readValue(message, OrderEvent.class);
processOrderEvent(event);
}
@KafkaListener(topics = "payment-events", groupId = "payment-service")
public void consumePaymentEvent(String message) {
log.info("Received payment event: {}", message);
PaymentEvent event = objectMapper.readValue(message, PaymentEvent.class);
processPaymentEvent(event);
}
}
Kafka配置
# application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
listener:
ack-mode: manual
concurrency: 3
Kafka主题和分区
// 主题管理
@Component
public class KafkaTopicManager {
private final AdminClient adminClient;
public void createTopic(String topicName, int partitions, short replicationFactor) {
NewTopic topic = new NewTopic(topicName, partitions, replicationFactor);
adminClient.createTopics(Collections.singletonList(topic))
.all()
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to create topic: {}", topicName, ex);
} else {
log.info("Topic created successfully: {}", topicName);
}
});
}
public void deleteTopic(String topicName) {
adminClient.deleteTopics(Collections.singletonList(topicName))
.all()
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to delete topic: {}", topicName, ex);
} else {
log.info("Topic deleted successfully: {}", topicName);
}
});
}
}
RocketMQ架构
核心概念
// RocketMQ生产者
@Component
public class RocketMQMessageProducer {
private final RocketMQTemplate rocketMQTemplate;
public void sendMessage(String topic, String message) {
rocketMQTemplate.convertAndSend(topic, message);
}
// 发送顺序消息
public void sendOrderMessage(String topic, String message, String orderId) {
rocketMQTemplate.syncSendOrderly(topic, message, orderId);
}
// 发送延时消息
public void sendDelayMessage(String topic, String message, int delayLevel) {
Message<String> msg = MessageBuilder.withPayload(message).build();
rocketMQTemplate.syncSend(topic, msg, 3000, delayLevel);
}
// 发送事务消息
public void sendTransactionMessage(String topic, String message) {
Message<String> msg = MessageBuilder.withPayload(message).build();
rocketMQTemplate.sendMessageInTransaction(topic, msg, null);
}
}
// RocketMQ消费者
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
selectorExpression = "TAG_A || TAG_B"
)
public class RocketMQMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("Received message: {}", message);
// 处理消息
processMessage(message);
}
}
// 事务消息监听器
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务
processLocalTransaction(msg);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务状态
boolean committed = checkTransactionStatus(msg);
return committed ? RocketMQLocalTransactionState.COMMIT
: RocketMQLocalTransactionState.UNKNOWN;
}
}
RocketMQ配置
# application.yml
rocketmq:
name-server: localhost:9876
producer:
group: my-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
send-message-max-retry: 3
RabbitMQ架构
核心概念
// RabbitMQ生产者
@Component
public class RabbitMQMessageProducer {
private final RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
// 发送带有确认的消息
public void sendMessageWithConfirm(String exchange, String routingKey, String message) {
CorrelationData correlationData = new CorrelationData();
rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
correlationData.getFuture().addCallback(
result -> log.info("Message confirmed: {}", result),
ex -> log.error("Message not confirmed", ex)
);
}
// 发送延时消息
public void sendDelayMessage(String exchange, String routingKey, String message, int delay) {
rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
msg.getMessageProperties().setDelay(delay);
return msg;
});
}
}
// RabbitMQ消费者
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "order-queue", durable = "true"),
exchange = @Exchange(value = "order-exchange", type = "topic"),
key = "order.#"
)
)
public class RabbitMQMessageConsumer {
@RabbitHandler
public void handleMessage(String message) {
log.info("Received message: {}", message);
// 处理消息
processMessage(message);
}
@RabbitHandler
public void handleMessageWithHeaders(String message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
log.info("Received message: {}", message);
// 处理消息
processMessage(message);
// 确认消息
channel.basicAck(tag, false);
} catch (Exception e) {
// 拒绝消息,重新入队
channel.basicNack(tag, false, true);
}
}
}
RabbitMQ配置
# application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
prefetch: 10
concurrency: 3
方案对比
| 特性 | Kafka | RocketMQ | RabbitMQ |
|---|---|---|---|
| 吞吐量 | 极高 | 高 | 中等 |
| 延迟 | 毫秒级 | 毫秒级 | 微秒级 |
| 消息可靠性 | 高 | 高 | 高 |
| 消息顺序 | 支持 | 支持 | 不支持 |
| 事务消息 | 支持 | 支持 | 不支持 |
| 延时消息 | 不支持 | 支持 | 支持 |
| 消息回溯 | 支持 | 支持 | 不支持 |
| 适用场景 | 日志、大数据 | 电商、金融 | 企业应用 |
使用场景
日志收集
// 使用Kafka收集日志
@Component
public class LogCollector {
private final KafkaTemplate<String, String> kafkaTemplate;
public void collectLog(String level, String message) {
LogEntry entry = new LogEntry();
entry.setLevel(level);
entry.setMessage(message);
entry.setTimestamp(LocalDateTime.now());
entry.setServiceName(getServiceName());
kafkaTemplate.send("log-topic", objectMapper.writeValueAsString(entry));
}
}
// 日志消费和存储
@Component
@KafkaListener(topics = "log-topic", groupId = "log-storage")
public class LogStorage {
private final ElasticsearchClient elasticsearchClient;
@Override
public void consumeLog(String message) {
LogEntry entry = objectMapper.readValue(message, LogEntry.class);
// 存储到Elasticsearch
IndexRequest request = new IndexRequest("logs")
.source(objectMapper.convertValue(entry, Map.class));
elasticsearchClient.index(request);
}
}
订单处理
// 使用RocketMQ处理订单
@Component
public class OrderProducer {
private final RocketMQTemplate rocketMQTemplate;
public void createOrder(Order order) {
// 发送订单创建消息
rocketMQTemplate.convertAndSend("order-topic", order);
}
public void updateOrderStatus(Long orderId, String status) {
// 发送顺序消息,保证同一订单的状态更新顺序
OrderStatusUpdate update = new OrderStatusUpdate(orderId, status);
rocketMQTemplate.syncSendOrderly("order-status-topic", update, orderId.toString());
}
}
// 订单消费
@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-consumer")
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 处理订单
processOrder(order);
}
}
事件驱动架构
// 使用RabbitMQ实现事件驱动
@Component
public class EventProducer {
private final RabbitTemplate rabbitTemplate;
public void publishEvent(String eventType, Object payload) {
Event event = new Event();
event.setType(eventType);
event.setPayload(payload);
event.setTimestamp(LocalDateTime.now());
// 发送到交换机
rabbitTemplate.convertAndSend(
"event-exchange",
"event." + eventType.toLowerCase(),
objectMapper.writeValueAsString(event)
);
}
}
// 事件消费
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "event-queue", durable = "true"),
exchange = @Exchange(value = "event-exchange", type = "topic"),
key = "event.*"
)
)
public class EventConsumer {
@RabbitHandler
public void handleEvent(String message) {
Event event = objectMapper.readValue(message, Event.class);
// 根据事件类型分发处理
switch (event.getType()) {
case "ORDER_CREATED":
handleOrderCreated((OrderCreatedEvent) event.getPayload());
break;
case "PAYMENT_RECEIVED":
handlePaymentReceived((PaymentReceivedEvent) event.getPayload());
break;
// ... 其他事件类型
}
}
}
最佳实践
- 消息可靠性:确保消息不丢失,使用持久化和确认机制
- 幂等性设计:消费者应该能够处理重复消息
- 消息顺序:对于需要顺序处理的消息,使用顺序消息
- 消息积压:监控消息积压情况,及时扩容消费者
- 消息过滤:使用消息过滤减少不必要的网络传输
- 消息回溯:支持消息回溯,便于问题排查和数据修复