Skip to content

第66课:消息队列 - Kafka

🎯 学习目标

  • 理解 Kafka 的核心模型:Producer、Topic、Partition、Broker、Consumer Group、Offset。
  • 掌握 Spring Kafka 的生产者、消费者、序列化、消费者组和手动提交。
  • 能解释分区、副本、顺序性、消费位移和消息重试的关系。
  • 能识别消息丢失、重复消费、积压、乱序、大消息等常见问题。
  • 能判断 Kafka 和 RabbitMQ 的适用场景差异。

📖 一、Kafka 适合做什么

Kafka 是高吞吐分布式日志系统,常用于:

text
用户行为日志
订单事件流
数据同步
异步解耦
指标采集
日志管道
流式处理

Kafka 不只是“消息队列”,更像可持久化、可重放的分布式事件日志。

不适合:

text
极复杂路由
低吞吐但要求复杂确认的小型任务队列
需要每条消息独立 TTL 的场景

📖 二、核心架构

text
Producer -> Topic -> Partition -> Broker
                               -> Consumer Group -> Consumer

核心概念:

概念含义
Topic消息主题
PartitionTopic 的分区,决定并行度和顺序范围
BrokerKafka 节点
Producer生产消息
Consumer消费消息
Consumer Group消费者组,同组内分摊分区
Offset消费位移
Replica分区副本

Kafka 的顺序性只在单个分区内保证。想让同一订单的事件有序,应使用订单 ID 作为 key,让它们进入同一分区。


📖 三、依赖和配置

xml
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

配置:

yaml
spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
    consumer:
      group-id: order-service
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "com.example"
      auto-offset-reset: earliest

acks=all 表示 leader 等待 ISR 副本确认后再认为写入成功,可靠性更高。


📖 四、生产者

java
@Service
public class OrderEventProducer {
    private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;

    public void send(OrderCreatedEvent event) {
        kafkaTemplate.send("order-created", event.orderId().toString(), event)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    log.error("send order event failed, orderId={}", event.orderId(), ex);
                } else {
                    log.info("send order event success, topic={}, offset={}",
                        result.getRecordMetadata().topic(),
                        result.getRecordMetadata().offset());
                }
            });
    }
}

key 的作用:

text
相同 key 的消息进入同一分区。
同一分区内消息有序。
key 影响负载分布。

📖 五、消费者

java
@Component
public class OrderEventConsumer {

    @KafkaListener(topics = "order-created", groupId = "inventory-service")
    public void handle(OrderCreatedEvent event) {
        inventoryService.reserve(event.orderId(), event.items());
    }
}

同一个 consumer group 内,一个分区同一时刻只会分配给一个消费者。不同 group 可以各自完整消费同一个 topic。

示例:

text
inventory-service 组消费订单事件扣库存
coupon-service 组消费订单事件冻结优惠券

📖 六、手动提交 Offset

自动提交可能在业务处理失败时导致消息丢失。关键业务可使用手动 ack。

配置:

yaml
spring:
  kafka:
    listener:
      ack-mode: manual

代码:

java
@KafkaListener(topics = "order-created")
public void handle(OrderCreatedEvent event, Acknowledgment ack) {
    try {
        inventoryService.reserve(event.orderId(), event.items());
        ack.acknowledge();
    } catch (Exception e) {
        log.error("consume failed, orderId={}", event.orderId(), e);
        throw e;
    }
}

手动提交提高控制力,但重复消费仍可能发生。消费者必须幂等。


📖 七、重试和死信

消费者失败后可以重试。长期失败的消息应进入死信 Topic。

text
order-created
order-created-retry
order-created-dlt

处理原则:

text
短暂网络失败:可重试
业务数据错误:进入死信并告警
代码 bug:暂停消费或修复后重放

不要无限重试阻塞整个分区。


⚠️ 八、常见陷阱

1. 认为 Kafka 不会重复消费

网络、重平衡、提交失败都可能导致重复消费。消费者必须幂等。

2. 错误 key 导致乱序

同一业务对象的事件如果进入不同分区,就无法保证顺序。

3. 消息积压只加消费者

如果分区数只有 3,同一 consumer group 最多 3 个消费者并行有效。加到 10 个也没用。

4. 大消息

Kafka 不适合传大文件。大数据应存对象存储,消息里传引用。

5. 忽略 rebalancing

消费者组扩缩容会触发分区重分配,期间可能短暂停顿。


🆚 九、Java vs C 对比

维度C 客户端Spring Kafka
生产消费librdkafka APIKafkaTemplate / @KafkaListener
序列化手动处理Serializer/Deserializer
Offset手动管理较多容器封装提交模式
错误处理手写ErrorHandler/重试/DLT

Spring Kafka 简化了接入,但 Kafka 的分区、位移和消费语义仍必须理解。


💡 十、最佳实践

  • 事件命名使用过去式,例如 OrderCreatedEvent
  • 关键业务消息生产端设置 acks=all
  • 消费者必须幂等。
  • 用业务 ID 做 key,保证单对象事件有序。
  • 分区数决定消费者组并行上限。
  • 长期失败消息进入死信 Topic。
  • 监控 consumer lag、发送失败、消费失败、重平衡次数。
  • 大文件不要直接放消息体。

🔍 十一、自测问题

text
Topic 和 Partition 有什么关系?
Kafka 的顺序性范围是什么?
Consumer Group 如何分摊分区?
Offset 表示什么?
为什么消费者必须幂等?
acks=all 有什么意义?
消息积压时为什么只加消费者不一定有效?
Kafka 和 RabbitMQ 的适用场景有什么区别?

🧭 十二、Kafka 上线检查清单

text
Topic 分区数是否满足并行度?
消息 key 是否能保证业务顺序?
生产者 acks 是否符合可靠性要求?
消费者是否幂等?
是否监控 consumer lag?
失败消息是否有重试和死信策略?
消息体是否过大?
Schema 是否有版本兼容策略?
是否记录 eventId 便于去重?

Kafka 的问题通常不是“发不出去”,而是积压、重复、乱序和不可追踪。


🧪 十三、实战案例:订单事件

事件设计:

java
public record OrderCreatedEvent(
    String eventId,
    Long orderId,
    Long userId,
    LocalDateTime occurredAt,
    List<OrderItem> items
) {
}

发送时:

java
kafkaTemplate.send("order-created", event.orderId().toString(), event);

消费时要去重:

java
if (eventLogRepository.existsByEventId(event.eventId())) {
    return;
}
inventoryService.reserve(event);
eventLogRepository.save(event.eventId());

这样即使重复消费,也不会重复扣库存。


📌 十四、学习建议

建议做三个实验:

text
同一个 key 的消息是否进入同一分区。
增加消费者数量是否提升消费速度。
消费者失败重启后是否重复消费。

这些实验能直接理解分区、消费者组和 offset。


📚 十五、Kafka 指标速查

指标含义
consumer lag消费落后多少消息
records-lag-max最大分区滞后
bytes-in/out进出流量
under-replicated partitions副本不足分区
request latency请求延迟
rebalance count重平衡次数

消费积压时优先判断:

text
生产速度是否突然升高。
消费逻辑是否变慢。
分区数是否限制并行度。
下游数据库或接口是否成为瓶颈。
是否有毒消息阻塞分区。

📌 十六、事件版本兼容

事件一旦被多个服务消费,就不能随意删除字段或改变语义。

建议:

text
新增字段保持可选。
不要随意重命名字段。
事件中包含版本号。
消费者忽略未知字段。
重大变更新建 topic 或事件类型。

Kafka 的可重放特性意味着旧消息可能在未来再次被消费,兼容性非常重要。


📌 十七、消费幂等方案

常见做法:

text
消息带全局 eventId。
消费前查询 eventId 是否处理过。
业务表使用唯一键防重复。
处理成功后记录消费日志。
重复消息直接跳过。

伪流程:

text
收到消息
检查 eventId
执行业务
记录 eventId
提交 offset

幂等是消息系统的基本要求,不是可选优化。

如果消费者不能做到幂等,任何重试、重平衡或网络抖动都可能放大成业务事故。

消费幂等最好用数据库唯一约束兜底。 只靠内存 Set 去重无法跨重启生效。 关键消息处理结果要能审计。


🎓 小结

Kafka 的核心是分区日志和消费者组。它适合高吞吐事件流、日志和数据管道。使用 Kafka 时,重点不是会发消息,而是理解分区、顺序、位移、重复消费、积压和死信处理。