Skip to content

第67课:消息队列 - RabbitMQ

🎯 学习目标

  • 理解 RabbitMQ 的核心模型:Producer、Exchange、Queue、Binding、Consumer。
  • 掌握 Direct、Topic、Fanout、Headers Exchange 的路由差异。
  • 能使用 Spring AMQP 发送、消费、确认和处理死信。
  • 能识别消息丢失、重复消费、未确认堆积、死信配置错误等问题。
  • 能判断 RabbitMQ 和 Kafka 的适用边界。

📖 一、RabbitMQ 适合做什么

RabbitMQ 是传统消息队列,强调灵活路由和可靠投递,常用于:

text
异步任务
订单超时取消
邮件短信发送
业务解耦
削峰填谷
复杂路由
延迟消息

它的模型:

text
Producer -> Exchange -> Queue -> Consumer

生产者不直接发到队列,而是发到 Exchange。Exchange 根据绑定规则路由到队列。


📖 二、Exchange 类型

1. Direct Exchange

精确匹配 routing key。

java
@Bean
public DirectExchange orderExchange() {
    return new DirectExchange("order.direct");
}

@Bean
public Queue orderCreatedQueue() {
    return QueueBuilder.durable("order.created.queue").build();
}

@Bean
public Binding orderCreatedBinding() {
    return BindingBuilder.bind(orderCreatedQueue())
        .to(orderExchange())
        .with("order.created");
}

2. Topic Exchange

支持通配符:

text
* 匹配一个单词
# 匹配零个或多个单词
java
@Bean
public TopicExchange eventExchange() {
    return new TopicExchange("event.topic");
}

@Bean
public Binding errorBinding(Queue queue) {
    return BindingBuilder.bind(queue)
        .to(eventExchange())
        .with("*.error");
}

3. Fanout Exchange

广播到所有绑定队列:

java
@Bean
public FanoutExchange broadcastExchange() {
    return new FanoutExchange("broadcast.fanout");
}

适合系统公告、缓存失效广播等。


📖 三、Spring AMQP 配置

依赖:

xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置:

yaml
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated
    publisher-returns: true
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 10

prefetch 控制单个消费者一次最多拿多少未确认消息。


📖 四、发送消息

java
@Service
public class OrderMessageProducer {
    private final RabbitTemplate rabbitTemplate;

    public void sendOrderCreated(OrderCreatedEvent event) {
        rabbitTemplate.convertAndSend(
            "order.direct",
            "order.created",
            event,
            message -> {
                message.getMessageProperties().setMessageId(event.eventId());
                return message;
            }
        );
    }
}

发布确认:

java
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setConfirmCallback((correlationData, ack, cause) -> {
        if (!ack) {
            log.error("message send failed, cause={}", cause);
        }
    });
    template.setReturnsCallback(returned -> {
        log.error("message returned, exchange={}, routingKey={}",
            returned.getExchange(), returned.getRoutingKey());
    });
    return template;
}

Confirm 表示消息是否到达 Exchange,Return 表示消息无法路由到队列。


📖 五、消费消息

java
@Component
public class OrderMessageConsumer {

    @RabbitListener(queues = "order.created.queue")
    public void handle(OrderCreatedEvent event, Channel channel, Message message) throws IOException {
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            orderService.handleCreated(event);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            log.error("consume failed, eventId={}", event.eventId(), e);
            channel.basicNack(tag, false, false);
        }
    }
}

basicNack(tag, false, false) 表示拒绝并不重新入队,通常会进入死信队列。


📖 六、死信队列

消息成为死信的常见原因:

text
被消费者 reject/nack 且不重新入队
消息过期
队列达到最大长度

配置:

java
@Bean
public Queue orderQueue() {
    return QueueBuilder.durable("order.queue")
        .deadLetterExchange("order.dlx")
        .deadLetterRoutingKey("order.dead")
        .build();
}

@Bean
public DirectExchange deadLetterExchange() {
    return new DirectExchange("order.dlx");
}

@Bean
public Queue deadLetterQueue() {
    return QueueBuilder.durable("order.dead.queue").build();
}

死信队列必须有监控和人工/自动补偿机制,否则只是把问题藏起来。


📖 七、延迟消息

常见场景:订单 30 分钟未支付自动取消。

方案:

text
TTL + 死信队列
RabbitMQ 延迟插件
业务定时扫描

TTL + DLX 示例:

java
QueueBuilder.durable("order.delay.queue")
    .ttl(30 * 60 * 1000)
    .deadLetterExchange("order.direct")
    .deadLetterRoutingKey("order.timeout")
    .build();

⚠️ 八、常见陷阱

1. 以为消息只会消费一次

网络异常、消费者重启、ack 失败都会导致重复消费。消费者必须幂等。

2. 手动 ack 漏掉

未 ack 消息会堆积在 unacked 状态,影响消费能力。

3. requeue=true 导致死循环

业务永久失败的消息如果一直重新入队,会反复消费拖垮系统。

4. Confirm 和消费确认混淆

生产者 Confirm 保证到 Exchange;消费者 Ack 保证消费成功。它们不是一回事。

5. 队列无人消费

消息进队列不代表业务完成。必须监控队列长度和消费者状态。


🆚 九、Java vs C 对比

维度C 客户端Spring AMQP
发送AMQP APIRabbitTemplate
消费回调处理@RabbitListener
队列声明手写声明Bean 配置
确认手动 ack/nack容器封装但可手动控制
序列化手动MessageConverter

Spring AMQP 降低了接入成本,但可靠性仍取决于确认、幂等、死信和监控设计。


💡 十、最佳实践

  • 生产者开启 Confirm 和 Return。
  • 关键消费者使用手动 ack。
  • 消费者必须幂等。
  • 业务失败不要无限 requeue。
  • 配置死信队列并监控。
  • 队列、Exchange、RoutingKey 命名要清晰。
  • 设置合理 prefetch,避免单消费者压太多消息。
  • 大任务消息只传 ID,具体数据从数据库或对象存储读取。

🔍 十一、自测问题

text
Exchange 和 Queue 分别负责什么?
Direct、Topic、Fanout 有什么区别?
Confirm 和 Ack 有什么区别?
什么情况下消息会进入死信队列?
为什么消费者必须幂等?
prefetch 有什么作用?
requeue=true 为什么可能导致死循环?
RabbitMQ 和 Kafka 的适用场景有什么不同?

🧭 十二、RabbitMQ 上线检查清单

text
Exchange、Queue、RoutingKey 命名是否清晰?
队列是否 durable?
消息是否需要持久化?
生产者 Confirm 是否开启?
Return Callback 是否处理?
消费者是否手动 ack?
失败消息是否进入死信队列?
死信队列是否有告警?
消费者是否幂等?
prefetch 是否合理?

这些配置缺一项,就可能在故障时丢消息、重复消息或消息堆积。


🧪 十三、实战案例:订单超时取消

流程:

text
创建订单时发送延迟消息。
消息 30 分钟后进入超时队列。
消费者检查订单是否仍未支付。
未支付则取消订单并释放库存。
已支付则忽略。

关键点:

text
取消操作必须幂等。
消费者不能只相信消息状态,要查询数据库当前状态。
延迟消息可能有误差,不能用于强实时场景。
失败消息进入死信并告警。

📌 十四、学习建议

建议分别测试:

text
routingKey 不匹配时 Return Callback 是否触发。
消费者抛异常时消息是否重新入队。
basicNack requeue=false 是否进入死信队列。
prefetch 改大后单消费者吞吐和堆积变化。

RabbitMQ 的可靠性要通过这些失败实验理解。


📚 十五、RabbitMQ 与 Kafka 简要对比

维度RabbitMQKafka
模型Exchange 路由到 QueueTopic 分区日志
强项复杂路由、任务队列高吞吐、事件流、可重放
顺序队列内相对有序分区内有序
消费后消息通常确认后删除保留一段时间
延迟任务常用 TTL + DLX不是强项

选型不要看哪个更流行,要看业务需要路由灵活性还是事件流吞吐。


📌 十六、故障排查重点

text
Ready 消息持续增长:消费者处理不过来。
Unacked 持续增长:消费者拿了但未确认。
Publish 失败:检查 Confirm。
Return 增加:routingKey 没有路由到队列。
死信增加:消费者失败或消息过期。

🎓 小结

RabbitMQ 的核心是 Exchange 路由和 Queue 消费。它适合复杂路由、可靠任务和业务异步。使用 RabbitMQ 时,重点是可靠投递、手动确认、死信处理、幂等消费和队列监控。