消息队列:异步通信的核心
消息队列:异步通信的核心
概述
消息队列(MQ)是分布式系统中重要的异步通信机制,它实现了服务间的解耦、异步处理和流量削峰。
1. 消息队列概念
public class OrderService {
private final RabbitTemplate rabbitTemplate;
public void createOrder(Order order) {
orderRepository.save(order);
rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
}
}
2. RabbitMQ
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@Service
public class MessageProducer {
private final RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}
@Service
public class MessageConsumer {
@RabbitListener(queues = "order.created.queue")
public void handleOrderCreated(Order order) {
System.out.println("处理订单: " + order.getId());
}
}
3. Kafka
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.annotation.KafkaListener;
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
@Service
public class KafkaConsumer {
@KafkaListener(topics = "order-topic", groupId = "order-group")
public void consume(String message) {
System.out.println("收到消息: " + message);
}
}
4. 消息可靠性保障
// 生产者确认
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, 3);
// 消费者手动确认
@KafkaListener(topics = "order-topic")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
processMessage(record.value());
ack.acknowledge();
} catch (Exception e) {
// 重试或死信队列
}
}
最佳实践
- 消息幂等性:消费者处理消息要幂等
- 消息可靠性:生产者确认+消费者手动确认
- 顺序消息:使用相同key保证顺序
- 消息积压:监控消费者处理速度
- 死信队列:处理失败的消息
总结
消息队列是分布式系统中重要的异步通信机制,掌握RabbitMQ或Kafka的使用,可以实现服务解耦、异步处理和流量削峰。