上一篇解决了 partition 怎么分给 consumer。这一篇解决分到之后的核心问题:consumer 读到哪里了,怎么记住。

这个"记住"机制看起来简单——不过是记录一个数字(offset)。但"什么时候记"和"怎么记"的选择,直接决定了消息会不会丢、会不会重复处理。auto-commit 的默认行为经常被误解为"自动保证 exactly-once",实际上它给出的是 at-least-once 甚至更弱的语义。

本文只抓一个问题:offset 的提交策略如何影响消费语义。

三个 offset

理解 offset 管理需要区分三个位置:

1
2
3
4
5
6
7
8
9
Partition Log:
┌────┬────┬────┬────┬────┬────┬────┬────┬─────┐
0 1 2 3 4 5 6 7 │ ... │
└────┴────┴────┴────┴────┴────┴────┴────┴─────┘
▲ ▲ ▲
│ │ │
committed position LEO
offset=3 (next fetch) (log-end
offset=5 offset=8)
  • committed offset:consumer 上次提交的位置。rebalance 或重启后,consumer 从这个位置恢复消费。存储在 __consumer_offsets topic 中。
  • position(当前位置):consumer 下一次 poll() 将从这个位置开始拉取。它在内存中维护,不持久化。
  • log-end offset(LEO):partition 中最新一条消息的下一个 offset。producer 写入新消息时 LEO 递增。

committed offset 和 position 之间的差距是"已拉取但未提交"的消息。consumer lag 通常指 LEO 与 committed offset 之间的差距。

__consumer_offsets:offset 存在哪里

committed offset 存储在一个名为 __consumer_offsets 的内部 topic 中。这个 topic 有 50 个 partition(offsets.topic.num.partitions 配置),使用 log compaction 策略。

每条 offset 提交记录的 key 是 (group.id, topic, partition) 三元组,value 是 committed offset 值和元数据。由于 log compaction 只保留每个 key 的最新值,__consumer_offsets 不会无限增长。

offset 存储在 Kafka 自身的 topic 中而不是外部存储(如 ZooKeeper),这个设计在 Kafka 0.9 引入(之前确实存在 ZooKeeper 中)。好处是:offset 提交走 Kafka 自己的复制协议,具备与普通消息相同的持久性保证;同时减少了对 ZooKeeper 的依赖和写入压力。

auto-commit:poll() 里藏的提交

默认配置下,consumer 启用 auto-commit:

1
2
enable.auto.commit=true
auto.commit.interval.ms=5000

auto-commit 的实际行为:每次调用 poll() 时,consumer 检查距离上次提交是否超过 auto.commit.interval.ms(默认 5 秒)。如果超过,就在本次 poll() 的开头把上一次 poll() 返回的最大 offset 提交出去。

注意这个时序:提交发生在 poll() 内部,提交的是上一轮 poll() 拉到的 offset,而不是当前轮。

1
2
3
4
5
6
7
8
9
poll() 返回 offset 3-7 的消息
→ 应用处理 offset 3, 4, 5... (处理到 5 时崩溃)
→ 下次 poll() 时 auto-commit 提交 offset 8
(但 poll() 没被调用,因为崩溃了)

重启后:
→ 从 committed offset (上次提交的值) 恢复
→ 如果上次成功提交了 offset 3,重新消费 3-7
→ offset 3, 4, 5 被重复处理

auto-commit 在 consumer 正常运行时表现良好。问题出现在崩溃、rebalance 或处理延迟超过 commit 间隔时:

  • 崩溃在两次 poll() 之间:已拉取但未提交的消息在重启后被重复消费。
  • 处理耗时超过 auto.commit.interval.ms:下一次 poll() 提交时,实际上提交的是已经处理完的消息——这种情况下 auto-commit 工作正常。
  • rebalance 发生时:auto-commit 在 rebalance 前会触发一次提交,但如果提交的 offset 超过了实际处理完的 offset,接管 partition 的 consumer 会跳过未处理的消息。

手动提交:commitSync() 与 commitAsync()

关闭 auto-commit 后,应用代码控制何时提交 offset:

1
enable.auto.commit=false

commitSync()

同步提交。调用后阻塞直到 broker 确认提交成功(或抛出异常)。

1
2
3
4
5
6
7
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
consumer.commitSync();
}

commitSync() 在失败时自动重试(受 default.api.timeout.ms 限制)。如果重试耗尽仍然失败,抛出 CommitFailedException。

代价:每次提交都要等待网络往返,降低消费吞吐。

commitAsync()

异步提交。调用后立即返回,通过回调通知结果。

1
2
3
4
5
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("Commit failed for offsets {}", offsets, exception);
}
});

commitAsync() 不会自动重试——因为在异步场景下,重试一个旧的 offset 提交可能覆盖一个新的已成功提交的 offset。

常见的折中方案是:循环中使用 commitAsync(),consumer 关闭前使用 commitSync() 确保最后一批 offset 提交成功。

1
2
3
4
5
6
7
8
9
10
11
12
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
consumer.commitAsync();
}
} finally {
consumer.commitSync();
consumer.close();
}

按 partition 提交

commitSync() 和 commitAsync() 都支持传入一个 Map<TopicPartition, OffsetAndMetadata> 参数,按 partition 粒度提交。这在一次 poll() 返回大量消息、需要在处理过程中分批提交时有用:

1
2
3
4
5
6
7
8
9
10
11
int count = 0;
for (ConsumerRecord<String, String> record : records) {
process(record);
count++;
if (count % 100 == 0) {
consumer.commitSync(Map.of(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
));
}
}

注意提交的值是 record.offset() + 1——committed offset 的含义是"下一次消费的起始位置",不是"最后消费的位置"。

提交策略与消费语义

offset 的提交时机决定消费语义:

at-most-once

先提交 offset,再处理消息。如果处理过程中崩溃,提交已经完成,重启后从新 offset 开始——崩溃时正在处理的消息丢失,不会被重复处理。

1
2
3
4
5
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
consumer.commitSync(); // 先提交
for (ConsumerRecord<String, String> record : records) {
process(record); // 后处理;崩溃则消息丢失
}

at-least-once

先处理消息,再提交 offset。如果处理完成后、提交前崩溃,重启后从旧 offset 开始——已处理的消息被重复消费。

1
2
3
4
5
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record); // 先处理
}
consumer.commitSync(); // 后提交;崩溃则消息重复

这是大多数应用选择的语义。配合业务层的幂等处理(如数据库 upsert、去重表),可以在应用层面实现 effectively-once。

exactly-once

仅靠 consumer 端的 offset 提交无法实现 exactly-once。需要将 offset 提交和业务处理放在同一个事务中:

  • 方案一:使用 Kafka 事务。producer 调用 sendOffsetsToTransaction() 把 offset 提交和消息写入绑定在同一个事务中。这要求消费结果写回 Kafka(consume-transform-produce 模式)。详见第 09 篇。
  • 方案二:把 offset 存到业务数据库。在处理消息的同一个数据库事务中写入 offset,重启时从数据库读取 offset 而不是从 __consumer_offsets 读取。需要调用 consumer.seek() 手动定位。

consumer lag

consumer lag = LEO - committed offset。这个值反映 consumer 的消费进度。

1
2
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group my-group

输出中的 LAG 列就是每个 partition 的 consumer lag。

lag 持续增长意味着 consumer 的处理速度跟不上 producer 的写入速度。可能的原因:

  • consumer 处理逻辑太慢(如每条消息做一次数据库写入而没有批量处理)。
  • consumer 数量少于 partition 数量,某个 consumer 承担了过多 partition。
  • GC 暂停或网络抖动导致 consumer 频繁 rebalance,消费中断。

监控 consumer lag 是 Kafka 运维的基本指标。Burrow(LinkedIn 开源)和 Kafka 自带的 JMX 指标 records-lag-max 都可以用于监控。

offset reset:auto.offset.reset

当 consumer 启动时,如果 __consumer_offsets 中没有这个 group 对这个 partition 的 committed offset 记录,consumer 需要决定从哪里开始消费。这由 auto.offset.reset 控制:

  • earliest:从 partition 的最早可用 offset 开始消费(不一定是 0,因为旧 segment 可能已被删除)。
  • latest:从 partition 的 LEO 开始消费,只消费启动后新写入的消息。
  • none:抛出异常,不自动决定。
1
auto.offset.reset=earliest

触发 offset reset 的条件:

  • 全新的 consumer group 首次消费某个 topic。
  • committed offset 对应的消息已经被删除(segment 过期被清理)。offset 失效后,consumer 根据 auto.offset.reset 重新定位。

常见误解:offset reset 不会在每次重启时触发。只要 __consumer_offsets 中有有效的 committed offset 记录,consumer 就从 committed offset 恢复,auto.offset.reset 不生效。

手动重置 offset

运维场景下需要手动重置 consumer group 的 offset:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 重置到最早
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --topic my-topic \
--reset-offsets --to-earliest --execute

# 重置到特定 offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --topic my-topic \
--reset-offsets --to-offset 1000 --execute

# 重置到特定时间点
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group --topic my-topic \
--reset-offsets --to-datetime 2026-06-20T00:00:00.000 --execute

执行 reset 前必须确保 group 中没有活跃的 consumer(所有 consumer 需要先停止)。--dry-run 参数可以预览 reset 结果而不实际执行。

实验:auto-commit 下的重复消费

准备

1
2
3
4
5
6
7
8
9
10
# 创建 topic
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic offset-test \
--partitions 1 --replication-factor 1

# 写入 20 条消息
for i in $(seq 1 20); do
echo "message-$i" | kafka-console-producer.sh \
--bootstrap-server localhost:9092 --topic offset-test
done

用 auto-commit 消费,中途杀掉

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "offset-demo");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "30000"); // 30 秒,故意设长
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("offset-test"));

while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset=%d value=%s%n",
record.offset(), record.value());
if (record.offset() == 9) {
System.out.println("Simulating crash at offset 10");
System.exit(1); // 模拟崩溃
}
}
}

观察结果

第一次运行:consumer 消费 offset 0-9 后退出。由于 auto.commit.interval.ms 设为 30 秒且只运行了几秒,auto-commit 没有触发。__consumer_offsets 中没有这个 group 的 committed offset。

第二次运行:consumer 从 offset 0 重新开始消费(因为没有 committed offset,触发 auto.offset.reset=latestearliest)。offset 0-9 被重复处理。

如果把 auto.commit.interval.ms 改为 1000(1 秒),第一次运行中 auto-commit 可能在第二次 poll() 时提交了 offset 0-N。第二次运行就会从 N+1 开始,避免了部分重复。但处理到 offset 9 时崩溃,N+1 到 9 之间的消息仍然会被重复消费。

切换为手动提交(enable.auto.commit=false,在每条消息处理后调用 commitSync()),可以把重复消费的粒度从"一个 auto-commit 周期的所有消息"缩小到"最后一条已处理但未提交的消息"。

模式提炼

offset 管理本质上是一种游标(cursor)模式:consumer 在一个有序序列上维护读取位置,通过持久化游标位置来实现断点续读。

这个模式出现在多种系统中:

  • 数据库游标:应用通过 DECLARE CURSOR + FETCH 遍历结果集,游标位置由数据库服务端维护。
  • CDC 位点:Debezium、Canal 等 CDC 工具记录 binlog position 或 GTID,用于断点续读增量变更。
  • RocketMQ offset:同样是 consumer 维护每个 queue 的消费进度,存储在 broker 端(广播模式存客户端本地)。
  • 文件系统 seek:lseek() 维护文件描述符的读写偏移量。

Kafka offset 的特殊之处在于:游标本身也存储在日志中(__consumer_offsets topic),享受与业务消息相同的复制和持久化保障。

工程迁移表

概念 Kafka Offset RocketMQ Offset 数据库游标 CDC 位点
位置标识 64-bit offset 64-bit offset cursor position binlog position / GTID
存储位置 __consumer_offsets topic broker 内存 + 文件 数据库服务端 自定义存储
提交方式 auto / sync / async auto(定时 5s)/ 手动 隐式随 FETCH 应用层写入
重置能力 按时间 / 按 offset / earliest / latest 按时间 / 按 offset SCROLL / ABSOLUTE 按位点 / 按时间
失效处理 auto.offset.reset 策略 从最新开始 游标失效报错 重新全量同步
持久化保障 Kafka 副本复制 同步/异步刷盘 事务保证 取决于存储选择

常见误解

“auto-commit 保证 exactly-once”——auto-commit 给出的是 at-least-once 或更弱的语义(取决于崩溃时机)。如果在 auto-commit 触发后、消息处理前崩溃,还可能出现 at-most-once(跳过未处理的消息)。

“offset reset 在每次重启时都会触发”——只有当 committed offset 不存在或已失效时才触发。正常重启的 consumer 从 committed offset 恢复,auto.offset.reset 不参与。

“committed offset 就是最后处理的消息的 offset”——committed offset 是"下一次消费的起始位置",等于最后处理的消息的 offset + 1。提交 offset 时传入 record.offset() 而不是 record.offset() + 1 是一个常见 bug,会导致最后一条消息被重复消费。

练习

  1. 用上面的实验代码验证 auto-commit 下的重复消费。修改 auto.commit.interval.ms 为不同值(1 秒、5 秒、30 秒),观察崩溃后重复消费的消息数量如何变化。

  2. 改写实验代码为手动提交模式(commitSync() 在每条消息处理后调用),重复崩溃实验,对比重复消费的粒度。

  3. kafka-consumer-groups.sh --reset-offsets--to-datetime 参数把 consumer group 的 offset 重置到 1 小时前。验证重置后 consumer 从指定时间点的消息开始消费。

  4. 在代码中使用 consumer.seek() 手动指定起始 offset,跳过 __consumer_offsets 中的 committed offset。验证 seek 的优先级高于 committed offset。

系列导航

序号 主题
00 导读:为什么 Kafka 的核心是一根日志
01 架构总览:Broker、Controller 与元数据管理
02 日志存储:Segment、Index 与零拷贝
03 Producer 内部机制:攒批、分区与 acks
04 幂等 Producer 与序列号:消息不重不丢的第一层
05 Consumer Group 协议:分配、重平衡与静态成员
06 Offset 管理:提交、重置与消费语义
07 副本与 ISR:高可用的代价和折中
08 Controller 与 KRaft:从 ZooKeeper 到内置共识
09 Exactly-Once 与事务:跨 partition 的原子写入
10 日志压缩:把 topic 当 KV 表用
11 Kafka Streams:在日志之上构建流处理
12 Kafka Connect:标准化的数据管道
13 Schema Registry 与数据治理:给消息加上契约
14 性能模型:吞吐、延迟与调优思路
15 安全体系:认证、授权与加密
16 生产运维:集群扩缩、监控指标与故障排查
17 Kafka 与 RocketMQ:两种消息系统的设计选择

参考资料