← 返回首页
🌐

消息队列架构Kafka/RocketMQ/RabbitMQ

📂 architecture ⏱ 5 min 915 words

消息队列架构Kafka/RocketMQ/RabbitMQ

消息队列概述

消息队列(Message Queue,MQ)是一种异步通信机制,允许应用程序通过消息进行通信。消息队列可以解耦系统组件、提高系统可扩展性、处理高并发场景。

Kafka架构

核心概念

// Kafka生产者
@Component
public class KafkaMessageProducer {
    
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
    
    public void sendMessageWithCallback(String topic, String message) {
        kafkaTemplate.send(topic, message).addCallback(
            result -> log.info("Message sent successfully: {}", result.getRecordMetadata()),
            ex -> log.error("Failed to send message", ex)
        );
    }
    
    // 发送带有Key的消息
    public void sendMessageWithKey(String topic, String key, String message) {
        kafkaTemplate.send(topic, key, message);
    }
}

// Kafka消费者
@Component
public class KafkaMessageConsumer {
    
    @KafkaListener(topics = "order-events", groupId = "order-service")
    public void consumeOrderEvent(String message) {
        log.info("Received order event: {}", message);
        
        // 处理消息
        OrderEvent event = objectMapper.readValue(message, OrderEvent.class);
        processOrderEvent(event);
    }
    
    @KafkaListener(topics = "payment-events", groupId = "payment-service")
    public void consumePaymentEvent(String message) {
        log.info("Received payment event: {}", message);
        
        PaymentEvent event = objectMapper.readValue(message, PaymentEvent.class);
        processPaymentEvent(event);
    }
}

Kafka配置

# application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 3
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false
    listener:
      ack-mode: manual
      concurrency: 3

Kafka主题和分区

// 主题管理
@Component
public class KafkaTopicManager {
    
    private final AdminClient adminClient;
    
    public void createTopic(String topicName, int partitions, short replicationFactor) {
        NewTopic topic = new NewTopic(topicName, partitions, replicationFactor);
        
        adminClient.createTopics(Collections.singletonList(topic))
            .all()
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Failed to create topic: {}", topicName, ex);
                } else {
                    log.info("Topic created successfully: {}", topicName);
                }
            });
    }
    
    public void deleteTopic(String topicName) {
        adminClient.deleteTopics(Collections.singletonList(topicName))
            .all()
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("Failed to delete topic: {}", topicName, ex);
                } else {
                    log.info("Topic deleted successfully: {}", topicName);
                }
            });
    }
}

RocketMQ架构

核心概念

// RocketMQ生产者
@Component
public class RocketMQMessageProducer {
    
    private final RocketMQTemplate rocketMQTemplate;
    
    public void sendMessage(String topic, String message) {
        rocketMQTemplate.convertAndSend(topic, message);
    }
    
    // 发送顺序消息
    public void sendOrderMessage(String topic, String message, String orderId) {
        rocketMQTemplate.syncSendOrderly(topic, message, orderId);
    }
    
    // 发送延时消息
    public void sendDelayMessage(String topic, String message, int delayLevel) {
        Message<String> msg = MessageBuilder.withPayload(message).build();
        rocketMQTemplate.syncSend(topic, msg, 3000, delayLevel);
    }
    
    // 发送事务消息
    public void sendTransactionMessage(String topic, String message) {
        Message<String> msg = MessageBuilder.withPayload(message).build();
        rocketMQTemplate.sendMessageInTransaction(topic, msg, null);
    }
}

// RocketMQ消费者
@Component
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group",
    selectorExpression = "TAG_A || TAG_B"
)
public class RocketMQMessageConsumer implements RocketMQListener<String> {
    
    @Override
    public void onMessage(String message) {
        log.info("Received message: {}", message);
        
        // 处理消息
        processMessage(message);
    }
}

// 事务消息监听器
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            // 执行本地事务
            processLocalTransaction(msg);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 检查本地事务状态
        boolean committed = checkTransactionStatus(msg);
        return committed ? RocketMQLocalTransactionState.COMMIT 
                        : RocketMQLocalTransactionState.UNKNOWN;
    }
}

RocketMQ配置

# application.yml
rocketmq:
  name-server: localhost:9876
  producer:
    group: my-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2
    send-message-max-retry: 3

RabbitMQ架构

核心概念

// RabbitMQ生产者
@Component
public class RabbitMQMessageProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    public void sendMessage(String exchange, String routingKey, String message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
    
    // 发送带有确认的消息
    public void sendMessageWithConfirm(String exchange, String routingKey, String message) {
        CorrelationData correlationData = new CorrelationData();
        
        rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
        
        correlationData.getFuture().addCallback(
            result -> log.info("Message confirmed: {}", result),
            ex -> log.error("Message not confirmed", ex)
        );
    }
    
    // 发送延时消息
    public void sendDelayMessage(String exchange, String routingKey, String message, int delay) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message, msg -> {
            msg.getMessageProperties().setDelay(delay);
            return msg;
        });
    }
}

// RabbitMQ消费者
@Component
@RabbitListener(
    bindings = @QueueBinding(
        value = @Queue(value = "order-queue", durable = "true"),
        exchange = @Exchange(value = "order-exchange", type = "topic"),
        key = "order.#"
    )
)
public class RabbitMQMessageConsumer {
    
    @RabbitHandler
    public void handleMessage(String message) {
        log.info("Received message: {}", message);
        
        // 处理消息
        processMessage(message);
    }
    
    @RabbitHandler
    public void handleMessageWithHeaders(String message, Channel channel, 
                                         @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            log.info("Received message: {}", message);
            
            // 处理消息
            processMessage(message);
            
            // 确认消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 拒绝消息,重新入队
            channel.basicNack(tag, false, true);
        }
    }
}

RabbitMQ配置

# application.yml
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 10
        concurrency: 3

方案对比

特性 Kafka RocketMQ RabbitMQ
吞吐量 极高 中等
延迟 毫秒级 毫秒级 微秒级
消息可靠性
消息顺序 支持 支持 不支持
事务消息 支持 支持 不支持
延时消息 不支持 支持 支持
消息回溯 支持 支持 不支持
适用场景 日志、大数据 电商、金融 企业应用

使用场景

日志收集

// 使用Kafka收集日志
@Component
public class LogCollector {
    
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    public void collectLog(String level, String message) {
        LogEntry entry = new LogEntry();
        entry.setLevel(level);
        entry.setMessage(message);
        entry.setTimestamp(LocalDateTime.now());
        entry.setServiceName(getServiceName());
        
        kafkaTemplate.send("log-topic", objectMapper.writeValueAsString(entry));
    }
}

// 日志消费和存储
@Component
@KafkaListener(topics = "log-topic", groupId = "log-storage")
public class LogStorage {
    
    private final ElasticsearchClient elasticsearchClient;
    
    @Override
    public void consumeLog(String message) {
        LogEntry entry = objectMapper.readValue(message, LogEntry.class);
        
        // 存储到Elasticsearch
        IndexRequest request = new IndexRequest("logs")
                .source(objectMapper.convertValue(entry, Map.class));
        
        elasticsearchClient.index(request);
    }
}

订单处理

// 使用RocketMQ处理订单
@Component
public class OrderProducer {
    
    private final RocketMQTemplate rocketMQTemplate;
    
    public void createOrder(Order order) {
        // 发送订单创建消息
        rocketMQTemplate.convertAndSend("order-topic", order);
    }
    
    public void updateOrderStatus(Long orderId, String status) {
        // 发送顺序消息,保证同一订单的状态更新顺序
        OrderStatusUpdate update = new OrderStatusUpdate(orderId, status);
        rocketMQTemplate.syncSendOrderly("order-status-topic", update, orderId.toString());
    }
}

// 订单消费
@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-consumer")
public class OrderConsumer implements RocketMQListener<Order> {
    
    @Override
    public void onMessage(Order order) {
        // 处理订单
        processOrder(order);
    }
}

事件驱动架构

// 使用RabbitMQ实现事件驱动
@Component
public class EventProducer {
    
    private final RabbitTemplate rabbitTemplate;
    
    public void publishEvent(String eventType, Object payload) {
        Event event = new Event();
        event.setType(eventType);
        event.setPayload(payload);
        event.setTimestamp(LocalDateTime.now());
        
        // 发送到交换机
        rabbitTemplate.convertAndSend(
            "event-exchange",
            "event." + eventType.toLowerCase(),
            objectMapper.writeValueAsString(event)
        );
    }
}

// 事件消费
@Component
@RabbitListener(
    bindings = @QueueBinding(
        value = @Queue(value = "event-queue", durable = "true"),
        exchange = @Exchange(value = "event-exchange", type = "topic"),
        key = "event.*"
    )
)
public class EventConsumer {
    
    @RabbitHandler
    public void handleEvent(String message) {
        Event event = objectMapper.readValue(message, Event.class);
        
        // 根据事件类型分发处理
        switch (event.getType()) {
            case "ORDER_CREATED":
                handleOrderCreated((OrderCreatedEvent) event.getPayload());
                break;
            case "PAYMENT_RECEIVED":
                handlePaymentReceived((PaymentReceivedEvent) event.getPayload());
                break;
            // ... 其他事件类型
        }
    }
}

最佳实践

  1. 消息可靠性:确保消息不丢失,使用持久化和确认机制
  2. 幂等性设计:消费者应该能够处理重复消息
  3. 消息顺序:对于需要顺序处理的消息,使用顺序消息
  4. 消息积压:监控消息积压情况,及时扩容消费者
  5. 消息过滤:使用消息过滤减少不必要的网络传输
  6. 消息回溯:支持消息回溯,便于问题排查和数据修复