消息队列概述
消息队列概述
消息队列(Message Queue,MQ)是一种跨进程/跨网络的通信机制,允许应用通过消息进行异步交互。它是分布式系统中最核心的基础设施之一。
消息队列的核心作用
1. 异步处理
用户注册后需要发送邮件、短信、积分计算,使用MQ可以异步执行:
@Service
public class UserService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void register(User user) {
userRepository.save(user);
// 异步处理,不阻塞主流程
rabbitTemplate.convertAndSend("user.exchange", "user.register",
new UserRegisterEvent(user.getId(), user.getEmail()));
}
}
2. 系统解耦
订单系统与库存系统解耦,订单创建后发送消息,库存系统自行消费:
@Component
@RabbitListener(queues = "order.created")
public class InventoryConsumer {
@RabbitHandler
public void handleOrderCreated(OrderCreatedEvent event) {
for (OrderItem item : event.getItems()) {
inventoryService.deduct(item.getSkuId(), item.getQuantity());
}
}
}
3. 流量削峰
秒杀场景下,大量请求先进入MQ缓冲,后端按能力消费:
@Component
@RabbitListener(queues = "seckill.orders")
public class SeckillConsumer {
@RabbitHandler
public void handleSeckill(SeckillRequest request) {
if (stockService.tryDeduct(request.getSkuId(), request.getQuantity())) {
orderService.createOrder(request);
}
}
}
消息模型对比
| 特性 | 点对点(Queue) | 发布/订阅(Topic) |
|---|---|---|
| 消费者数量 | 1个消息只被1个消费者消费 | 1个消息被所有订阅者消费 |
| 使用场景 | 任务分发、负载均衡 | 事件广播、通知 |
| 消息确认 | 消费者确认后移除 | 各消费者独立确认 |
主流消息队列对比
| 中间件 | 语言 | 吞吐量 | 延迟 | 可靠性 | 适用场景 |
|---|---|---|---|---|---|
| RabbitMQ | Erlang | 万级 | 微秒级 | 高 | 企业级应用、复杂路由 |
| Kafka | Scala/Java | 百万级 | 毫秒级 | 高 | 大数据、日志收集 |
| RocketMQ | Java | 十万级 | 毫秒级 | 高 | 电商、金融、事务消息 |
| ActiveMQ | Java | 万级 | 毫秒级 | 中 | 传统企业应用 |
消息可靠性保障
生产者确认机制
// RabbitMQ 确认模式
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory cf) {
RabbitTemplate template = new RabbitTemplate(cf);
template.setConfirmCallback((data, ack, cause) -> {
if (!ack) {
log.error("消息发送失败: {}", cause);
}
});
return template;
}
消费者手动确认
@RabbitListener(queues = "important.queue")
public void handleManualAck(Message message, Channel channel) throws IOException {
try {
processMessage(message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
消息幂等性
消费者需要保证幂等性,防止重复消费:
@Component
public class IdempotentConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
public boolean tryAcquireLock(String messageId) {
String key = "mq:lock:" + messageId;
return Boolean.TRUE.equals(
redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS));
}
}
小结
消息队列是分布式系统的解耦利器,选择合适的MQ中间件并做好可靠性保障是架构设计的关键。