Appearance
第66课:消息队列 - Kafka
🎯 学习目标
- 理解 Kafka 的核心模型:Producer、Topic、Partition、Broker、Consumer Group、Offset。
- 掌握 Spring Kafka 的生产者、消费者、序列化、消费者组和手动提交。
- 能解释分区、副本、顺序性、消费位移和消息重试的关系。
- 能识别消息丢失、重复消费、积压、乱序、大消息等常见问题。
- 能判断 Kafka 和 RabbitMQ 的适用场景差异。
📖 一、Kafka 适合做什么
Kafka 是高吞吐分布式日志系统,常用于:
text
用户行为日志
订单事件流
数据同步
异步解耦
指标采集
日志管道
流式处理Kafka 不只是“消息队列”,更像可持久化、可重放的分布式事件日志。
不适合:
text
极复杂路由
低吞吐但要求复杂确认的小型任务队列
需要每条消息独立 TTL 的场景📖 二、核心架构
text
Producer -> Topic -> Partition -> Broker
-> Consumer Group -> Consumer核心概念:
| 概念 | 含义 |
|---|---|
| Topic | 消息主题 |
| Partition | Topic 的分区,决定并行度和顺序范围 |
| Broker | Kafka 节点 |
| Producer | 生产消息 |
| Consumer | 消费消息 |
| Consumer Group | 消费者组,同组内分摊分区 |
| Offset | 消费位移 |
| Replica | 分区副本 |
Kafka 的顺序性只在单个分区内保证。想让同一订单的事件有序,应使用订单 ID 作为 key,让它们进入同一分区。
📖 三、依赖和配置
xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>配置:
yaml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
consumer:
group-id: order-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.example"
auto-offset-reset: earliestacks=all 表示 leader 等待 ISR 副本确认后再认为写入成功,可靠性更高。
📖 四、生产者
java
@Service
public class OrderEventProducer {
private final KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;
public void send(OrderCreatedEvent event) {
kafkaTemplate.send("order-created", event.orderId().toString(), event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("send order event failed, orderId={}", event.orderId(), ex);
} else {
log.info("send order event success, topic={}, offset={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().offset());
}
});
}
}key 的作用:
text
相同 key 的消息进入同一分区。
同一分区内消息有序。
key 影响负载分布。📖 五、消费者
java
@Component
public class OrderEventConsumer {
@KafkaListener(topics = "order-created", groupId = "inventory-service")
public void handle(OrderCreatedEvent event) {
inventoryService.reserve(event.orderId(), event.items());
}
}同一个 consumer group 内,一个分区同一时刻只会分配给一个消费者。不同 group 可以各自完整消费同一个 topic。
示例:
text
inventory-service 组消费订单事件扣库存
coupon-service 组消费订单事件冻结优惠券📖 六、手动提交 Offset
自动提交可能在业务处理失败时导致消息丢失。关键业务可使用手动 ack。
配置:
yaml
spring:
kafka:
listener:
ack-mode: manual代码:
java
@KafkaListener(topics = "order-created")
public void handle(OrderCreatedEvent event, Acknowledgment ack) {
try {
inventoryService.reserve(event.orderId(), event.items());
ack.acknowledge();
} catch (Exception e) {
log.error("consume failed, orderId={}", event.orderId(), e);
throw e;
}
}手动提交提高控制力,但重复消费仍可能发生。消费者必须幂等。
📖 七、重试和死信
消费者失败后可以重试。长期失败的消息应进入死信 Topic。
text
order-created
order-created-retry
order-created-dlt处理原则:
text
短暂网络失败:可重试
业务数据错误:进入死信并告警
代码 bug:暂停消费或修复后重放不要无限重试阻塞整个分区。
⚠️ 八、常见陷阱
1. 认为 Kafka 不会重复消费
网络、重平衡、提交失败都可能导致重复消费。消费者必须幂等。
2. 错误 key 导致乱序
同一业务对象的事件如果进入不同分区,就无法保证顺序。
3. 消息积压只加消费者
如果分区数只有 3,同一 consumer group 最多 3 个消费者并行有效。加到 10 个也没用。
4. 大消息
Kafka 不适合传大文件。大数据应存对象存储,消息里传引用。
5. 忽略 rebalancing
消费者组扩缩容会触发分区重分配,期间可能短暂停顿。
🆚 九、Java vs C 对比
| 维度 | C 客户端 | Spring Kafka |
|---|---|---|
| 生产消费 | librdkafka API | KafkaTemplate / @KafkaListener |
| 序列化 | 手动处理 | Serializer/Deserializer |
| Offset | 手动管理较多 | 容器封装提交模式 |
| 错误处理 | 手写 | ErrorHandler/重试/DLT |
Spring Kafka 简化了接入,但 Kafka 的分区、位移和消费语义仍必须理解。
💡 十、最佳实践
- 事件命名使用过去式,例如
OrderCreatedEvent。 - 关键业务消息生产端设置
acks=all。 - 消费者必须幂等。
- 用业务 ID 做 key,保证单对象事件有序。
- 分区数决定消费者组并行上限。
- 长期失败消息进入死信 Topic。
- 监控 consumer lag、发送失败、消费失败、重平衡次数。
- 大文件不要直接放消息体。
🔍 十一、自测问题
text
Topic 和 Partition 有什么关系?
Kafka 的顺序性范围是什么?
Consumer Group 如何分摊分区?
Offset 表示什么?
为什么消费者必须幂等?
acks=all 有什么意义?
消息积压时为什么只加消费者不一定有效?
Kafka 和 RabbitMQ 的适用场景有什么区别?🧭 十二、Kafka 上线检查清单
text
Topic 分区数是否满足并行度?
消息 key 是否能保证业务顺序?
生产者 acks 是否符合可靠性要求?
消费者是否幂等?
是否监控 consumer lag?
失败消息是否有重试和死信策略?
消息体是否过大?
Schema 是否有版本兼容策略?
是否记录 eventId 便于去重?Kafka 的问题通常不是“发不出去”,而是积压、重复、乱序和不可追踪。
🧪 十三、实战案例:订单事件
事件设计:
java
public record OrderCreatedEvent(
String eventId,
Long orderId,
Long userId,
LocalDateTime occurredAt,
List<OrderItem> items
) {
}发送时:
java
kafkaTemplate.send("order-created", event.orderId().toString(), event);消费时要去重:
java
if (eventLogRepository.existsByEventId(event.eventId())) {
return;
}
inventoryService.reserve(event);
eventLogRepository.save(event.eventId());这样即使重复消费,也不会重复扣库存。
📌 十四、学习建议
建议做三个实验:
text
同一个 key 的消息是否进入同一分区。
增加消费者数量是否提升消费速度。
消费者失败重启后是否重复消费。这些实验能直接理解分区、消费者组和 offset。
📚 十五、Kafka 指标速查
| 指标 | 含义 |
|---|---|
| consumer lag | 消费落后多少消息 |
| records-lag-max | 最大分区滞后 |
| bytes-in/out | 进出流量 |
| under-replicated partitions | 副本不足分区 |
| request latency | 请求延迟 |
| rebalance count | 重平衡次数 |
消费积压时优先判断:
text
生产速度是否突然升高。
消费逻辑是否变慢。
分区数是否限制并行度。
下游数据库或接口是否成为瓶颈。
是否有毒消息阻塞分区。📌 十六、事件版本兼容
事件一旦被多个服务消费,就不能随意删除字段或改变语义。
建议:
text
新增字段保持可选。
不要随意重命名字段。
事件中包含版本号。
消费者忽略未知字段。
重大变更新建 topic 或事件类型。Kafka 的可重放特性意味着旧消息可能在未来再次被消费,兼容性非常重要。
📌 十七、消费幂等方案
常见做法:
text
消息带全局 eventId。
消费前查询 eventId 是否处理过。
业务表使用唯一键防重复。
处理成功后记录消费日志。
重复消息直接跳过。伪流程:
text
收到消息
检查 eventId
执行业务
记录 eventId
提交 offset幂等是消息系统的基本要求,不是可选优化。
如果消费者不能做到幂等,任何重试、重平衡或网络抖动都可能放大成业务事故。
消费幂等最好用数据库唯一约束兜底。 只靠内存 Set 去重无法跨重启生效。 关键消息处理结果要能审计。
🎓 小结
Kafka 的核心是分区日志和消费者组。它适合高吞吐事件流、日志和数据管道。使用 Kafka 时,重点不是会发消息,而是理解分区、顺序、位移、重复消费、积压和死信处理。