← 返回首页

消息队列概述

📂 java ⏱ 2 min 232 words

消息队列概述

消息队列(Message Queue,MQ)是一种跨进程/跨网络的通信机制,允许应用通过消息进行异步交互。它是分布式系统中最核心的基础设施之一。

消息队列的核心作用

1. 异步处理

用户注册后需要发送邮件、短信、积分计算,使用MQ可以异步执行:

@Service
public class UserService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void register(User user) {
        userRepository.save(user);
        // 异步处理,不阻塞主流程
        rabbitTemplate.convertAndSend("user.exchange", "user.register",
            new UserRegisterEvent(user.getId(), user.getEmail()));
    }
}

2. 系统解耦

订单系统与库存系统解耦,订单创建后发送消息,库存系统自行消费:

@Component
@RabbitListener(queues = "order.created")
public class InventoryConsumer {
    @RabbitHandler
    public void handleOrderCreated(OrderCreatedEvent event) {
        for (OrderItem item : event.getItems()) {
            inventoryService.deduct(item.getSkuId(), item.getQuantity());
        }
    }
}

3. 流量削峰

秒杀场景下,大量请求先进入MQ缓冲,后端按能力消费:

@Component
@RabbitListener(queues = "seckill.orders")
public class SeckillConsumer {
    @RabbitHandler
    public void handleSeckill(SeckillRequest request) {
        if (stockService.tryDeduct(request.getSkuId(), request.getQuantity())) {
            orderService.createOrder(request);
        }
    }
}

消息模型对比

特性 点对点(Queue) 发布/订阅(Topic)
消费者数量 1个消息只被1个消费者消费 1个消息被所有订阅者消费
使用场景 任务分发、负载均衡 事件广播、通知
消息确认 消费者确认后移除 各消费者独立确认

主流消息队列对比

中间件 语言 吞吐量 延迟 可靠性 适用场景
RabbitMQ Erlang 万级 微秒级 企业级应用、复杂路由
Kafka Scala/Java 百万级 毫秒级 大数据、日志收集
RocketMQ Java 十万级 毫秒级 电商、金融、事务消息
ActiveMQ Java 万级 毫秒级 传统企业应用

消息可靠性保障

生产者确认机制

// RabbitMQ 确认模式
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory cf) {
    RabbitTemplate template = new RabbitTemplate(cf);
    template.setConfirmCallback((data, ack, cause) -> {
        if (!ack) {
            log.error("消息发送失败: {}", cause);
        }
    });
    return template;
}

消费者手动确认

@RabbitListener(queues = "important.queue")
public void handleManualAck(Message message, Channel channel) throws IOException {
    try {
        processMessage(message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

消息幂等性

消费者需要保证幂等性,防止重复消费:

@Component
public class IdempotentConsumer {
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public boolean tryAcquireLock(String messageId) {
        String key = "mq:lock:" + messageId;
        return Boolean.TRUE.equals(
            redisTemplate.opsForValue().setIfAbsent(key, "1", 24, TimeUnit.HOURS));
    }
}

小结

消息队列是分布式系统的解耦利器,选择合适的MQ中间件并做好可靠性保障是架构设计的关键。