← 返回首页

消息队列:异步通信的核心

📂 java ⏱ 1 min 172 words

消息队列:异步通信的核心

概述

消息队列(MQ)是分布式系统中重要的异步通信机制,它实现了服务间的解耦、异步处理和流量削峰。

1. 消息队列概念

public class OrderService {
    private final RabbitTemplate rabbitTemplate;

    public void createOrder(Order order) {
        orderRepository.save(order);
        rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
    }
}

2. RabbitMQ

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

@Service
public class MessageProducer {
    private final RabbitTemplate rabbitTemplate;
    public void sendMessage(String exchange, String routingKey, Object message) {
        rabbitTemplate.convertAndSend(exchange, routingKey, message);
    }
}

@Service
public class MessageConsumer {
    @RabbitListener(queues = "order.created.queue")
    public void handleOrderCreated(Order order) {
        System.out.println("处理订单: " + order.getId());
    }
}

3. Kafka

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.annotation.KafkaListener;

@Service
public class KafkaProducer {
    private final KafkaTemplate<String, String> kafkaTemplate;
    public void sendMessage(String topic, String message) {
        kafkaTemplate.send(topic, message);
    }
}

@Service
public class KafkaConsumer {
    @KafkaListener(topics = "order-topic", groupId = "order-group")
    public void consume(String message) {
        System.out.println("收到消息: " + message);
    }
}

4. 消息可靠性保障

// 生产者确认
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, 3);

// 消费者手动确认
@KafkaListener(topics = "order-topic")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
    try {
        processMessage(record.value());
        ack.acknowledge();
    } catch (Exception e) {
        // 重试或死信队列
    }
}

最佳实践

  1. 消息幂等性:消费者处理消息要幂等
  2. 消息可靠性:生产者确认+消费者手动确认
  3. 顺序消息:使用相同key保证顺序
  4. 消息积压:监控消费者处理速度
  5. 死信队列:处理失败的消息

总结

消息队列是分布式系统中重要的异步通信机制,掌握RabbitMQ或Kafka的使用,可以实现服务解耦、异步处理和流量削峰。