RabbitMQ实战指南
RabbitMQ实战指南
RabbitMQ是基于AMQP协议的消息中间件,以灵活的路由机制和可靠性著称,是企业级应用的首选MQ。
交换机类型
| 交换机类型 | 路由规则 | 适用场景 |
|---|---|---|
| Direct | 精确匹配routing key | 点对点、精确路由 |
| Fanout | 广播到所有绑定队列 | 事件广播 |
| Topic | 通配符匹配 | 灵活路由、日志分级 |
| Headers | 头部属性匹配 | 复杂条件路由 |
Spring AMQP配置
@Configuration
public class RabbitConfig {
public static final String ORDER_EXCHANGE = "order.exchange";
public static final String ORDER_QUEUE = "order.queue";
public static final String ORDER_KEY = "order.create";
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(ORDER_EXCHANGE);
}
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(ORDER_QUEUE)
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.build();
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_KEY);
}
}
消息生产者
@Service
@RequiredArgsConstructor
public class OrderProducer {
private final RabbitTemplate rabbitTemplate;
public void createOrder(OrderDTO dto) {
String messageId = UUID.randomUUID().toString();
rabbitTemplate.convertAndSend(
RabbitConfig.ORDER_EXCHANGE,
RabbitConfig.ORDER_KEY,
dto,
message -> {
message.getMessageProperties().setMessageId(messageId);
message.getMessageProperties().setHeader("type", "order.create");
return message;
}
);
}
}
消息消费者
@Component
@Slf4j
public class OrderConsumer {
@RabbitListener(queues = RabbitConfig.ORDER_QUEUE)
public void handleOrder(OrderDTO dto, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) {
try {
processOrder(dto);
channel.basicAck(tag, false);
} catch (DuplicateMessageException e) {
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, true);
}
}
}
死信队列
消息消费失败超过重试次数后进入死信队列:
@Component
public class DlxConsumer {
@RabbitListener(queues = "dlx.order.queue")
public void handleDeadLetter(Message message) {
log.error("死信消息: routingKey={}, body={}",
message.getMessageProperties().getReceivedRoutingKey(),
new String(message.getBody()));
// 记录到数据库或发送告警
}
}
消息确认机制
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
prefetch: 10
retry:
enabled: true
max-attempts: 3
延迟消息
使用RabbitMQ延迟插件实现定时任务:
@Bean
public Exchange delayExchange() {
return new DirectExchange("delay.exchange", true, false);
}
// 发送延迟消息(30分钟后执行)
rabbitTemplate.convertAndSend("delay.exchange", "delay.task",
message, m -> {
m.getMessageProperties().setDelay(1800000); // 30分钟
return m;
});
小结
RabbitMQ的灵活路由和可靠投递机制,使其成为复杂业务场景下消息中间件的理想选择。