← 返回首页

RabbitMQ实战指南

📂 java ⏱ 2 min 231 words

RabbitMQ实战指南

RabbitMQ是基于AMQP协议的消息中间件,以灵活的路由机制和可靠性著称,是企业级应用的首选MQ。

交换机类型

交换机类型 路由规则 适用场景
Direct 精确匹配routing key 点对点、精确路由
Fanout 广播到所有绑定队列 事件广播
Topic 通配符匹配 灵活路由、日志分级
Headers 头部属性匹配 复杂条件路由

Spring AMQP配置

@Configuration
public class RabbitConfig {
    public static final String ORDER_EXCHANGE = "order.exchange";
    public static final String ORDER_QUEUE = "order.queue";
    public static final String ORDER_KEY = "order.create";

    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(ORDER_EXCHANGE);
    }

    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable(ORDER_QUEUE)
            .withArgument("x-dead-letter-exchange", "dlx.exchange")
            .build();
    }

    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_KEY);
    }
}

消息生产者

@Service
@RequiredArgsConstructor
public class OrderProducer {
    private final RabbitTemplate rabbitTemplate;

    public void createOrder(OrderDTO dto) {
        String messageId = UUID.randomUUID().toString();
        rabbitTemplate.convertAndSend(
            RabbitConfig.ORDER_EXCHANGE,
            RabbitConfig.ORDER_KEY,
            dto,
            message -> {
                message.getMessageProperties().setMessageId(messageId);
                message.getMessageProperties().setHeader("type", "order.create");
                return message;
            }
        );
    }
}

消息消费者

@Component
@Slf4j
public class OrderConsumer {
    @RabbitListener(queues = RabbitConfig.ORDER_QUEUE)
    public void handleOrder(OrderDTO dto, Channel channel,
                           @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        try {
            processOrder(dto);
            channel.basicAck(tag, false);
        } catch (DuplicateMessageException e) {
            channel.basicAck(tag, false);
        } catch (Exception e) {
            channel.basicNack(tag, false, true);
        }
    }
}

死信队列

消息消费失败超过重试次数后进入死信队列:

@Component
public class DlxConsumer {
    @RabbitListener(queues = "dlx.order.queue")
    public void handleDeadLetter(Message message) {
        log.error("死信消息: routingKey={}, body={}",
            message.getMessageProperties().getReceivedRoutingKey(),
            new String(message.getBody()));
        // 记录到数据库或发送告警
    }
}

消息确认机制

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 10
        retry:
          enabled: true
          max-attempts: 3

延迟消息

使用RabbitMQ延迟插件实现定时任务:

@Bean
public Exchange delayExchange() {
    return new DirectExchange("delay.exchange", true, false);
}

// 发送延迟消息(30分钟后执行)
rabbitTemplate.convertAndSend("delay.exchange", "delay.task",
    message, m -> {
        m.getMessageProperties().setDelay(1800000); // 30分钟
        return m;
    });

小结

RabbitMQ的灵活路由和可靠投递机制,使其成为复杂业务场景下消息中间件的理想选择。