Offset 管理:提交、重置与消费语义
上一篇解决了 partition 怎么分给 consumer。这一篇解决分到之后的核心问题:consumer 读到哪里了,怎么记住。
这个"记住"机制看起来简单——不过是记录一个数字(offset)。但"什么时候记"和"怎么记"的选择,直接决定了消息会不会丢、会不会重复处理。auto-commit 的默认行为经常被误解为"自动保证 exactly-once",实际上它给出的是 at-least-once 甚至更弱的语义。
本文只抓一个问题:offset 的提交策略如何影响消费语义。
三个 offset
理解 offset 管理需要区分三个位置:
1 | |
- committed offset:consumer 上次提交的位置。rebalance 或重启后,consumer 从这个位置恢复消费。存储在
__consumer_offsetstopic 中。 - position(当前位置):consumer 下一次
poll()将从这个位置开始拉取。它在内存中维护,不持久化。 - log-end offset(LEO):partition 中最新一条消息的下一个 offset。producer 写入新消息时 LEO 递增。
committed offset 和 position 之间的差距是"已拉取但未提交"的消息。consumer lag 通常指 LEO 与 committed offset 之间的差距。
__consumer_offsets:offset 存在哪里
committed offset 存储在一个名为 __consumer_offsets 的内部 topic 中。这个 topic 有 50 个 partition(offsets.topic.num.partitions 配置),使用 log compaction 策略。
每条 offset 提交记录的 key 是 (group.id, topic, partition) 三元组,value 是 committed offset 值和元数据。由于 log compaction 只保留每个 key 的最新值,__consumer_offsets 不会无限增长。
offset 存储在 Kafka 自身的 topic 中而不是外部存储(如 ZooKeeper),这个设计在 Kafka 0.9 引入(之前确实存在 ZooKeeper 中)。好处是:offset 提交走 Kafka 自己的复制协议,具备与普通消息相同的持久性保证;同时减少了对 ZooKeeper 的依赖和写入压力。
auto-commit:poll() 里藏的提交
默认配置下,consumer 启用 auto-commit:
1 | |
auto-commit 的实际行为:每次调用 poll() 时,consumer 检查距离上次提交是否超过 auto.commit.interval.ms(默认 5 秒)。如果超过,就在本次 poll() 的开头把上一次 poll() 返回的最大 offset 提交出去。
注意这个时序:提交发生在 poll() 内部,提交的是上一轮 poll() 拉到的 offset,而不是当前轮。
1 | |
auto-commit 在 consumer 正常运行时表现良好。问题出现在崩溃、rebalance 或处理延迟超过 commit 间隔时:
- 崩溃在两次
poll()之间:已拉取但未提交的消息在重启后被重复消费。 - 处理耗时超过
auto.commit.interval.ms:下一次poll()提交时,实际上提交的是已经处理完的消息——这种情况下 auto-commit 工作正常。 - rebalance 发生时:auto-commit 在 rebalance 前会触发一次提交,但如果提交的 offset 超过了实际处理完的 offset,接管 partition 的 consumer 会跳过未处理的消息。
手动提交:commitSync() 与 commitAsync()
关闭 auto-commit 后,应用代码控制何时提交 offset:
1 | |
commitSync()
同步提交。调用后阻塞直到 broker 确认提交成功(或抛出异常)。
1 | |
commitSync() 在失败时自动重试(受 default.api.timeout.ms 限制)。如果重试耗尽仍然失败,抛出 CommitFailedException。
代价:每次提交都要等待网络往返,降低消费吞吐。
commitAsync()
异步提交。调用后立即返回,通过回调通知结果。
1 | |
commitAsync() 不会自动重试——因为在异步场景下,重试一个旧的 offset 提交可能覆盖一个新的已成功提交的 offset。
常见的折中方案是:循环中使用 commitAsync(),consumer 关闭前使用 commitSync() 确保最后一批 offset 提交成功。
1 | |
按 partition 提交
commitSync() 和 commitAsync() 都支持传入一个 Map<TopicPartition, OffsetAndMetadata> 参数,按 partition 粒度提交。这在一次 poll() 返回大量消息、需要在处理过程中分批提交时有用:
1 | |
注意提交的值是 record.offset() + 1——committed offset 的含义是"下一次消费的起始位置",不是"最后消费的位置"。
提交策略与消费语义
offset 的提交时机决定消费语义:
at-most-once
先提交 offset,再处理消息。如果处理过程中崩溃,提交已经完成,重启后从新 offset 开始——崩溃时正在处理的消息丢失,不会被重复处理。
1 | |
at-least-once
先处理消息,再提交 offset。如果处理完成后、提交前崩溃,重启后从旧 offset 开始——已处理的消息被重复消费。
1 | |
这是大多数应用选择的语义。配合业务层的幂等处理(如数据库 upsert、去重表),可以在应用层面实现 effectively-once。
exactly-once
仅靠 consumer 端的 offset 提交无法实现 exactly-once。需要将 offset 提交和业务处理放在同一个事务中:
- 方案一:使用 Kafka 事务。producer 调用
sendOffsetsToTransaction()把 offset 提交和消息写入绑定在同一个事务中。这要求消费结果写回 Kafka(consume-transform-produce 模式)。详见第 09 篇。 - 方案二:把 offset 存到业务数据库。在处理消息的同一个数据库事务中写入 offset,重启时从数据库读取 offset 而不是从
__consumer_offsets读取。需要调用consumer.seek()手动定位。
consumer lag
consumer lag = LEO - committed offset。这个值反映 consumer 的消费进度。
1 | |
输出中的 LAG 列就是每个 partition 的 consumer lag。
lag 持续增长意味着 consumer 的处理速度跟不上 producer 的写入速度。可能的原因:
- consumer 处理逻辑太慢(如每条消息做一次数据库写入而没有批量处理)。
- consumer 数量少于 partition 数量,某个 consumer 承担了过多 partition。
- GC 暂停或网络抖动导致 consumer 频繁 rebalance,消费中断。
监控 consumer lag 是 Kafka 运维的基本指标。Burrow(LinkedIn 开源)和 Kafka 自带的 JMX 指标 records-lag-max 都可以用于监控。
offset reset:auto.offset.reset
当 consumer 启动时,如果 __consumer_offsets 中没有这个 group 对这个 partition 的 committed offset 记录,consumer 需要决定从哪里开始消费。这由 auto.offset.reset 控制:
earliest:从 partition 的最早可用 offset 开始消费(不一定是 0,因为旧 segment 可能已被删除)。latest:从 partition 的 LEO 开始消费,只消费启动后新写入的消息。none:抛出异常,不自动决定。
1 | |
触发 offset reset 的条件:
- 全新的 consumer group 首次消费某个 topic。
- committed offset 对应的消息已经被删除(segment 过期被清理)。offset 失效后,consumer 根据
auto.offset.reset重新定位。
常见误解:offset reset 不会在每次重启时触发。只要 __consumer_offsets 中有有效的 committed offset 记录,consumer 就从 committed offset 恢复,auto.offset.reset 不生效。
手动重置 offset
运维场景下需要手动重置 consumer group 的 offset:
1 | |
执行 reset 前必须确保 group 中没有活跃的 consumer(所有 consumer 需要先停止)。--dry-run 参数可以预览 reset 结果而不实际执行。
实验:auto-commit 下的重复消费
准备
1 | |
用 auto-commit 消费,中途杀掉
1 | |
观察结果
第一次运行:consumer 消费 offset 0-9 后退出。由于 auto.commit.interval.ms 设为 30 秒且只运行了几秒,auto-commit 没有触发。__consumer_offsets 中没有这个 group 的 committed offset。
第二次运行:consumer 从 offset 0 重新开始消费(因为没有 committed offset,触发 auto.offset.reset=latest 或 earliest)。offset 0-9 被重复处理。
如果把 auto.commit.interval.ms 改为 1000(1 秒),第一次运行中 auto-commit 可能在第二次 poll() 时提交了 offset 0-N。第二次运行就会从 N+1 开始,避免了部分重复。但处理到 offset 9 时崩溃,N+1 到 9 之间的消息仍然会被重复消费。
切换为手动提交(enable.auto.commit=false,在每条消息处理后调用 commitSync()),可以把重复消费的粒度从"一个 auto-commit 周期的所有消息"缩小到"最后一条已处理但未提交的消息"。
模式提炼
offset 管理本质上是一种游标(cursor)模式:consumer 在一个有序序列上维护读取位置,通过持久化游标位置来实现断点续读。
这个模式出现在多种系统中:
- 数据库游标:应用通过 DECLARE CURSOR + FETCH 遍历结果集,游标位置由数据库服务端维护。
- CDC 位点:Debezium、Canal 等 CDC 工具记录 binlog position 或 GTID,用于断点续读增量变更。
- RocketMQ offset:同样是 consumer 维护每个 queue 的消费进度,存储在 broker 端(广播模式存客户端本地)。
- 文件系统 seek:
lseek()维护文件描述符的读写偏移量。
Kafka offset 的特殊之处在于:游标本身也存储在日志中(__consumer_offsets topic),享受与业务消息相同的复制和持久化保障。
工程迁移表
| 概念 | Kafka Offset | RocketMQ Offset | 数据库游标 | CDC 位点 |
|---|---|---|---|---|
| 位置标识 | 64-bit offset | 64-bit offset | cursor position | binlog position / GTID |
| 存储位置 | __consumer_offsets topic | broker 内存 + 文件 | 数据库服务端 | 自定义存储 |
| 提交方式 | auto / sync / async | auto(定时 5s)/ 手动 | 隐式随 FETCH | 应用层写入 |
| 重置能力 | 按时间 / 按 offset / earliest / latest | 按时间 / 按 offset | SCROLL / ABSOLUTE | 按位点 / 按时间 |
| 失效处理 | auto.offset.reset 策略 | 从最新开始 | 游标失效报错 | 重新全量同步 |
| 持久化保障 | Kafka 副本复制 | 同步/异步刷盘 | 事务保证 | 取决于存储选择 |
常见误解
“auto-commit 保证 exactly-once”——auto-commit 给出的是 at-least-once 或更弱的语义(取决于崩溃时机)。如果在 auto-commit 触发后、消息处理前崩溃,还可能出现 at-most-once(跳过未处理的消息)。
“offset reset 在每次重启时都会触发”——只有当 committed offset 不存在或已失效时才触发。正常重启的 consumer 从 committed offset 恢复,auto.offset.reset 不参与。
“committed offset 就是最后处理的消息的 offset”——committed offset 是"下一次消费的起始位置",等于最后处理的消息的 offset + 1。提交 offset 时传入 record.offset() 而不是 record.offset() + 1 是一个常见 bug,会导致最后一条消息被重复消费。
练习
-
用上面的实验代码验证 auto-commit 下的重复消费。修改
auto.commit.interval.ms为不同值(1 秒、5 秒、30 秒),观察崩溃后重复消费的消息数量如何变化。 -
改写实验代码为手动提交模式(
commitSync()在每条消息处理后调用),重复崩溃实验,对比重复消费的粒度。 -
用
kafka-consumer-groups.sh --reset-offsets的--to-datetime参数把 consumer group 的 offset 重置到 1 小时前。验证重置后 consumer 从指定时间点的消息开始消费。 -
在代码中使用
consumer.seek()手动指定起始 offset,跳过__consumer_offsets中的 committed offset。验证 seek 的优先级高于 committed offset。
系列导航
参考资料
- Apache Kafka 官方文档 — Consumer Configuration: https://kafka.apache.org/documentation/#consumerconfigs
- Apache Kafka 官方文档 — Design: https://kafka.apache.org/documentation/#design
- Confluent — Kafka Consumer: Commit Offsets: https://docs.confluent.io/platform/current/clients/consumer.html#offset-management
- Neha Narkhede, Gwen Shapira, Todd Palino. Kafka: The Definitive Guide. O’Reilly. Chapter 4: Kafka Consumers — Commits and Offsets.
- KIP-98: Exactly Once Delivery and Transactional Messaging: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
