Appearance
第67课:消息队列 - RabbitMQ
🎯 学习目标
- 理解 RabbitMQ 的核心模型:Producer、Exchange、Queue、Binding、Consumer。
- 掌握 Direct、Topic、Fanout、Headers Exchange 的路由差异。
- 能使用 Spring AMQP 发送、消费、确认和处理死信。
- 能识别消息丢失、重复消费、未确认堆积、死信配置错误等问题。
- 能判断 RabbitMQ 和 Kafka 的适用边界。
📖 一、RabbitMQ 适合做什么
RabbitMQ 是传统消息队列,强调灵活路由和可靠投递,常用于:
text
异步任务
订单超时取消
邮件短信发送
业务解耦
削峰填谷
复杂路由
延迟消息它的模型:
text
Producer -> Exchange -> Queue -> Consumer生产者不直接发到队列,而是发到 Exchange。Exchange 根据绑定规则路由到队列。
📖 二、Exchange 类型
1. Direct Exchange
精确匹配 routing key。
java
@Bean
public DirectExchange orderExchange() {
return new DirectExchange("order.direct");
}
@Bean
public Queue orderCreatedQueue() {
return QueueBuilder.durable("order.created.queue").build();
}
@Bean
public Binding orderCreatedBinding() {
return BindingBuilder.bind(orderCreatedQueue())
.to(orderExchange())
.with("order.created");
}2. Topic Exchange
支持通配符:
text
* 匹配一个单词
# 匹配零个或多个单词java
@Bean
public TopicExchange eventExchange() {
return new TopicExchange("event.topic");
}
@Bean
public Binding errorBinding(Queue queue) {
return BindingBuilder.bind(queue)
.to(eventExchange())
.with("*.error");
}3. Fanout Exchange
广播到所有绑定队列:
java
@Bean
public FanoutExchange broadcastExchange() {
return new FanoutExchange("broadcast.fanout");
}适合系统公告、缓存失效广播等。
📖 三、Spring AMQP 配置
依赖:
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>配置:
yaml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated
publisher-returns: true
listener:
simple:
acknowledge-mode: manual
prefetch: 10prefetch 控制单个消费者一次最多拿多少未确认消息。
📖 四、发送消息
java
@Service
public class OrderMessageProducer {
private final RabbitTemplate rabbitTemplate;
public void sendOrderCreated(OrderCreatedEvent event) {
rabbitTemplate.convertAndSend(
"order.direct",
"order.created",
event,
message -> {
message.getMessageProperties().setMessageId(event.eventId());
return message;
}
);
}
}发布确认:
java
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.error("message send failed, cause={}", cause);
}
});
template.setReturnsCallback(returned -> {
log.error("message returned, exchange={}, routingKey={}",
returned.getExchange(), returned.getRoutingKey());
});
return template;
}Confirm 表示消息是否到达 Exchange,Return 表示消息无法路由到队列。
📖 五、消费消息
java
@Component
public class OrderMessageConsumer {
@RabbitListener(queues = "order.created.queue")
public void handle(OrderCreatedEvent event, Channel channel, Message message) throws IOException {
long tag = message.getMessageProperties().getDeliveryTag();
try {
orderService.handleCreated(event);
channel.basicAck(tag, false);
} catch (Exception e) {
log.error("consume failed, eventId={}", event.eventId(), e);
channel.basicNack(tag, false, false);
}
}
}basicNack(tag, false, false) 表示拒绝并不重新入队,通常会进入死信队列。
📖 六、死信队列
消息成为死信的常见原因:
text
被消费者 reject/nack 且不重新入队
消息过期
队列达到最大长度配置:
java
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order.queue")
.deadLetterExchange("order.dlx")
.deadLetterRoutingKey("order.dead")
.build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("order.dlx");
}
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("order.dead.queue").build();
}死信队列必须有监控和人工/自动补偿机制,否则只是把问题藏起来。
📖 七、延迟消息
常见场景:订单 30 分钟未支付自动取消。
方案:
text
TTL + 死信队列
RabbitMQ 延迟插件
业务定时扫描TTL + DLX 示例:
java
QueueBuilder.durable("order.delay.queue")
.ttl(30 * 60 * 1000)
.deadLetterExchange("order.direct")
.deadLetterRoutingKey("order.timeout")
.build();⚠️ 八、常见陷阱
1. 以为消息只会消费一次
网络异常、消费者重启、ack 失败都会导致重复消费。消费者必须幂等。
2. 手动 ack 漏掉
未 ack 消息会堆积在 unacked 状态,影响消费能力。
3. requeue=true 导致死循环
业务永久失败的消息如果一直重新入队,会反复消费拖垮系统。
4. Confirm 和消费确认混淆
生产者 Confirm 保证到 Exchange;消费者 Ack 保证消费成功。它们不是一回事。
5. 队列无人消费
消息进队列不代表业务完成。必须监控队列长度和消费者状态。
🆚 九、Java vs C 对比
| 维度 | C 客户端 | Spring AMQP |
|---|---|---|
| 发送 | AMQP API | RabbitTemplate |
| 消费 | 回调处理 | @RabbitListener |
| 队列声明 | 手写声明 | Bean 配置 |
| 确认 | 手动 ack/nack | 容器封装但可手动控制 |
| 序列化 | 手动 | MessageConverter |
Spring AMQP 降低了接入成本,但可靠性仍取决于确认、幂等、死信和监控设计。
💡 十、最佳实践
- 生产者开启 Confirm 和 Return。
- 关键消费者使用手动 ack。
- 消费者必须幂等。
- 业务失败不要无限 requeue。
- 配置死信队列并监控。
- 队列、Exchange、RoutingKey 命名要清晰。
- 设置合理 prefetch,避免单消费者压太多消息。
- 大任务消息只传 ID,具体数据从数据库或对象存储读取。
🔍 十一、自测问题
text
Exchange 和 Queue 分别负责什么?
Direct、Topic、Fanout 有什么区别?
Confirm 和 Ack 有什么区别?
什么情况下消息会进入死信队列?
为什么消费者必须幂等?
prefetch 有什么作用?
requeue=true 为什么可能导致死循环?
RabbitMQ 和 Kafka 的适用场景有什么不同?🧭 十二、RabbitMQ 上线检查清单
text
Exchange、Queue、RoutingKey 命名是否清晰?
队列是否 durable?
消息是否需要持久化?
生产者 Confirm 是否开启?
Return Callback 是否处理?
消费者是否手动 ack?
失败消息是否进入死信队列?
死信队列是否有告警?
消费者是否幂等?
prefetch 是否合理?这些配置缺一项,就可能在故障时丢消息、重复消息或消息堆积。
🧪 十三、实战案例:订单超时取消
流程:
text
创建订单时发送延迟消息。
消息 30 分钟后进入超时队列。
消费者检查订单是否仍未支付。
未支付则取消订单并释放库存。
已支付则忽略。关键点:
text
取消操作必须幂等。
消费者不能只相信消息状态,要查询数据库当前状态。
延迟消息可能有误差,不能用于强实时场景。
失败消息进入死信并告警。📌 十四、学习建议
建议分别测试:
text
routingKey 不匹配时 Return Callback 是否触发。
消费者抛异常时消息是否重新入队。
basicNack requeue=false 是否进入死信队列。
prefetch 改大后单消费者吞吐和堆积变化。RabbitMQ 的可靠性要通过这些失败实验理解。
📚 十五、RabbitMQ 与 Kafka 简要对比
| 维度 | RabbitMQ | Kafka |
|---|---|---|
| 模型 | Exchange 路由到 Queue | Topic 分区日志 |
| 强项 | 复杂路由、任务队列 | 高吞吐、事件流、可重放 |
| 顺序 | 队列内相对有序 | 分区内有序 |
| 消费后消息 | 通常确认后删除 | 保留一段时间 |
| 延迟任务 | 常用 TTL + DLX | 不是强项 |
选型不要看哪个更流行,要看业务需要路由灵活性还是事件流吞吐。
📌 十六、故障排查重点
text
Ready 消息持续增长:消费者处理不过来。
Unacked 持续增长:消费者拿了但未确认。
Publish 失败:检查 Confirm。
Return 增加:routingKey 没有路由到队列。
死信增加:消费者失败或消息过期。🎓 小结
RabbitMQ 的核心是 Exchange 路由和 Queue 消费。它适合复杂路由、可靠任务和业务异步。使用 RabbitMQ 时,重点是可靠投递、手动确认、死信处理、幂等消费和队列监控。