上一篇走完了 Producer 的发送管线。这一篇解决一个写入端的经典问题:网络超时后重试,消息会不会写两遍。

重试导致重复是分布式系统的常见症状。Producer 发出一个请求,broker 写入成功但响应在网络中丢失,Producer 不知道成功了于是重发——同一条消息在日志中出现了两次。Kafka 从 0.11 版本开始,在 broker 端引入了基于序列号的去重机制,称为 idempotent producer。

本文只抓一个问题:幂等 Producer 是怎么用 PID 和序列号在 broker 端做去重的,它的边界在哪里。

重复问题的根源

先看一个没有幂等保护时的场景:

1
2
3
4
5
6
7
8
9
10
11
Producer                          Broker (Leader)
| |
|--- ProduceRequest(msg-A) ------->|
| |--- 写入 partition log,offset=100
| |
| (响应丢失或超时) |
| |
|--- ProduceRequest(msg-A) ------->| (重试)
| |--- 再次写入,offset=101
|<-- ProduceResponse(offset=101) --|
| |

msg-A 在 partition log 中出现了两次,offset 100 和 101。下游 consumer 会读到两条一模一样的消息。

这不是 Kafka 的 bug,而是 at-least-once 语义的必然结果:为了不丢消息,重试是必须的;但重试就可能重复。

PID 和序列号

幂等 Producer 的去重机制由两个标识组成:

  • Producer ID (PID):Producer 启动时向 broker 申请的唯一标识。每个 KafkaProducer 实例在其生命周期内持有一个 PID。
  • Sequence Number:Producer 为每个 <PID, TopicPartition> 对维护一个从 0 开始单调递增的序列号。每发送一个 batch,序列号加 1。

broker 端为每个 <PID, TopicPartition> 记录已接收的最新序列号。当一个 ProduceRequest 到达时,broker 检查:

  • 如果请求中的序列号 = 已记录序列号 + 1,正常写入,更新记录。
  • 如果请求中的序列号 <= 已记录序列号,说明是重复请求,返回成功但不重复写入。
  • 如果请求中的序列号 > 已记录序列号 + 1,说明中间有消息丢失,返回 OutOfOrderSequenceException。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Producer (PID=1001)
|
| 每个 partition 独立编号:
|
| partition-0: seq 0, 1, 2, 3 ...
| partition-1: seq 0, 1, 2, 3 ...
| partition-2: seq 0, 1, 2, 3 ...


Broker (Leader of partition-0)
|
| ProducerStateManager:
| PID=1001 -> last_seq = 3
|
| 收到 seq=4 -> 写入, 更新 last_seq=4
| 收到 seq=4 -> 重复, 返回成功, 不写入
| 收到 seq=6 -> 缺 seq=5, 报错

开启幂等

在 Producer 端只需要一个配置:

1
props.put("enable.idempotence", "true");

自 Kafka 3.0 起,当 acks=all 时 enable.idempotence 默认为 true。

开启幂等后,Kafka 客户端自动完成以下工作:

  1. Producer 启动时通过 InitProducerIdRequest 向 broker 申请 PID。
  2. 强制设置 acks=all(如果用户设了 acks=0 或 acks=1,会抛 ConfigException)。
  3. 强制设置 max.in.flight.requests.per.connection <= 5。
  4. 强制开启 retries(设为 Integer.MAX_VALUE)。
  5. 每个 ProducerBatch 携带 PID 和序列号。

这些约束是必要的:acks=all 保证消息到达所有 ISR 副本,max.in.flight <= 5 保证 broker 端序列号窗口可控,retries 保证暂时失败的消息能重新发送。

Broker 端的 ProducerStateManager

broker 端负责去重的组件是 ProducerStateManager。每个 partition 的 leader 维护一个 ProducerStateManager 实例,其中记录了每个活跃 PID 的状态:

1
2
3
4
ProducerStateManager (partition-0)
├── PID=1001: last_seq=42, last_offset=8850, epoch=0
├── PID=1002: last_seq=17, last_offset=8823, epoch=0
└── PID=1003: last_seq=5, last_offset=8801, epoch=1

这些状态不只存在内存中。每当 partition 生成新的 log segment 或创建 snapshot 时,ProducerStateManager 会把当前状态写入 .snapshot 文件。broker 重启后从 snapshot 恢复状态,配合日志重放补齐 snapshot 之后的记录。

关于内存开销:ProducerStateManager 为每个 PID 只保留最近 5 个 batch 的序列号(对应 max.in.flight.requests.per.connection 的上限)。一个 PID 的状态大约占 100 多字节。即使有数千个 Producer 同时写入,内存占用也在可控范围内。

Epoch:Producer 重启时发生了什么

PID 是 Producer 实例级别的标识。当一个 KafkaProducer 关闭再重新创建时,新实例会申请一个全新的 PID。

这意味着幂等的去重保证是 per-session 的:

  • 在同一个 Producer 实例的生命周期内(同一个 PID),同一条消息不会重复写入。
  • 一旦 Producer 重启(PID 变了),broker 无法关联新旧 PID,无法跨 session 去重。

具体场景:Producer 正在发送 batch-A(PID=1001, seq=10),网络超时,Producer 进程崩溃。重启后获得 PID=1002,重新发送 batch-A(PID=1002, seq=0)。broker 把这当成一条新消息写入,因为它来自一个新的 PID。

1
2
3
4
5
时间线:
t1: Producer(PID=1001) send batch(seq=10) -> broker 写入 offset=500
t2: 响应超时,Producer 崩溃
t3: Producer 重启,获得 PID=1002
t4: Producer(PID=1002) send batch(seq=0) -> broker 写入 offset=501 <- 重复!

每个 PID 还关联一个 epoch(纪元号)。epoch 用于 fencing:当同一个 transactional.id 的新 Producer 实例启动时,broker 递增 epoch 并拒绝旧 epoch 的写入请求。这是事务机制的一部分,下面只点到为止,第 09 篇会展开。

幂等的边界

幂等 Producer 解决的问题范围有明确的边界:

能保证的:

  • 单个 Producer 实例在其生命周期内,向单个 partition 写入不重复。
  • 在 max.in.flight <= 5 的约束下,即使发生重试,同一 partition 内的消息顺序不变。

不能保证的:

  • 跨 Producer 重启的去重(PID 变了)。
  • 跨 partition 的原子性(batch-A 写入 partition-0 成功,batch-B 写入 partition-1 失败,没有回滚机制)。
  • 端到端的 exactly-once(Producer 到 consumer 的完整链路需要事务 + consumer offset 提交的原子性)。

跨 partition 的原子性需要事务机制,即 transactional.id + Transaction Coordinator。第 09 篇会解决这个问题。

实验:验证幂等去重

实验目标:开启幂等的 Producer 在中途被 kill 后重启,验证 consumer 端是否收到重复消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// === Producer 端 ===
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("enable.idempotence", "true");
props.put("acks", "all");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

for (int i = 0; i < 10000; i++) {
String key = "msg-" + i;
producer.send(new ProducerRecord<>("idem-test", key, "payload-" + i));
if (i % 1000 == 0) {
System.out.println("Sent " + i);
}
}
producer.close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// === Consumer 端:统计每个 key 出现的次数 ===
Properties cProps = new Properties();
cProps.put("bootstrap.servers", "localhost:9092");
cProps.put("group.id", "idem-check");
cProps.put("auto.offset.reset", "earliest");
cProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
cProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(cProps);
consumer.subscribe(List.of("idem-test"));

Map<String, Integer> counts = new HashMap<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
if (records.isEmpty()) break;
for (ConsumerRecord<String, String> r : records) {
counts.merge(r.key(), 1, Integer::sum);
}
}

long duplicates = counts.values().stream().filter(c -> c > 1).count();
System.out.println("Total keys: " + counts.size());
System.out.println("Keys with duplicates: " + duplicates);
consumer.close();

在 Producer 发送过程中(比如发到第 5000 条时)用 kill -9 终止进程,然后重新运行 Producer 发送剩余消息。

预期结果:

  • 如果两次运行使用同一个 Producer 实例(不可能,因为进程被 kill 了),不会有重复。
  • 实际情况是两次运行获得不同的 PID,Consumer 端会看到 kill 前后重叠区间内的 key 出现两次。这验证了幂等是 per-session 的。

要解决跨重启的去重,需要 transactional.id——它在 broker 端提供了一个稳定的身份标识,不随 Producer 进程重启而改变。

模式提炼

Kafka 的幂等 Producer 是 sequence-number deduplication 模式的一个实例。这个模式在网络协议和分布式系统中反复出现:

  • TCP 序列号:每个字节有一个单调递增的序列号,接收端用序列号检测重复和乱序。Kafka 的 Producer 序列号几乎是同一思路在应用层的复刻。
  • RocketMQ 消息去重:RocketMQ 本身不提供 broker 端去重,通常依赖业务层的幂等表(数据库 unique key)或 Redis 去重。
  • HTTP 幂等键:Stripe、PayPal 等支付 API 要求客户端在请求头中携带 Idempotency-Key,服务端用这个 key 做去重。原理相同:用一个唯一标识 + 服务端状态实现重试安全。
  • 数据库 unique constraint:INSERT … ON CONFLICT DO NOTHING,用唯一索引在存储层去重。

模式的核心是:发送端标记唯一性,接收端记录已见标识并过滤重复。Kafka 的做法是把"唯一性标记"拆成 PID + 序列号两层,让 broker 在内存中维护一个很小的窗口就能完成去重。

工程迁移表

概念 Kafka 幂等 Producer RocketMQ 数据库
去重标识 PID + sequence number 无原生支持,业务层 msgId unique constraint / upsert
去重位置 broker 端 ProducerStateManager 业务层(Redis / DB) 存储引擎
去重粒度 per-PID, per-partition per-message(业务自定义) per-row
跨重启去重 不支持(PID 变化) 取决于业务 msgId 生成方式 支持(unique key 持久化)
顺序保证 序列号保证同 partition 内有序 orderly message auto-increment / sequence
开启成本 一行配置,吞吐影响 < 3% 需要额外基础设施 索引维护成本

常见误解

误解一:开启幂等后,消息在所有场景下都不会重复。
事实:幂等是 per-session 的。Producer 重启后获得新 PID,broker 无法关联旧 PID 的序列号。跨重启去重需要 transactional.id

误解二:enable.idempotence 会显著降低吞吐。
事实:幂等模式的额外开销主要是每个 ProduceRequest 多携带 PID 和序列号(几个字节),以及 broker 端维护 ProducerStateManager 的内存和 snapshot I/O。在 Confluent 的基准测试中,幂等模式的吞吐下降通常在 3% 以内。自 Kafka 3.0 起幂等已默认开启,官方显然认为这个开销是可接受的。

误解三:幂等 Producer 可以保证跨 partition 的原子性。
事实:幂等的去重和顺序保证都是 per-partition 的。向多个 partition 写入一组消息时,如果部分成功部分失败,幂等机制不提供回滚。跨 partition 原子性是事务的职责。

练习

  1. 开启 enable.idempotence=true,写一个 Producer 向单 partition topic 发送 10000 条消息。在发送过程中用 tc 命令模拟网络延迟(如 tc qdisc add dev eth0 root netem delay 200ms 100ms),观察 Producer 的重试行为和 consumer 端是否有重复。

  2. 尝试将 enable.idempotence=true 和 acks=1 组合使用,观察 Producer 启动时的报错信息。理解为什么幂等必须搭配 acks=all。

  3. 用 kafka-dump-log 工具查看一个 partition 的 .snapshot 文件内容,找到 ProducerStateManager 记录的 PID 和序列号信息。

  4. 写两个 Producer 实例(不使用 transactional.id),分别向同一个 partition 发送 key 相同的消息。验证两个 PID 的消息都会被写入——幂等只在同一 PID 内去重,不跨 PID。

系列导航

序号 主题
00 导读:为什么 Kafka 的核心是一根日志
01 架构总览:Broker、Controller 与元数据管理
02 日志存储:Segment、Index 与零拷贝
03 Producer 内部机制:攒批、分区与 acks
04 幂等 Producer 与序列号:消息不重不丢的第一层
05 Consumer Group 协议:分配、重平衡与静态成员
06 Offset 管理:提交、重置与消费语义
07 副本与 ISR:高可用的代价和折中
08 Controller 与 KRaft:从 ZooKeeper 到内置共识
09 Exactly-Once 与事务:跨 partition 的原子写入
10 日志压缩:把 topic 当 KV 表用
11 Kafka Streams:在日志之上构建流处理
12 Kafka Connect:标准化的数据管道
13 Schema Registry 与数据治理:给消息加上契约
14 性能模型:吞吐、延迟与调优思路
15 安全体系:认证、授权与加密
16 生产运维:集群扩缩、监控指标与故障排查
17 Kafka 与 RocketMQ:两种消息系统的设计选择

参考资料

  1. KIP-98: Exactly Once Delivery and Transactional Messaging:https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
  2. Apache Kafka 官方文档 — Idempotent Producer:https://kafka.apache.org/documentation/#producerconfigs_enable.idempotence
  3. Kafka 源码 — ProducerStateManager.scala:https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateManager.java
  4. Confluent Blog — Exactly-once Semantics are Possible: Here’s How Kafka Does It:https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
  5. Neha Narkhede, Gwen Shapira, Todd Palino. Kafka: The Definitive Guide. O’Reilly, Chapter 3: Kafka Producers — Idempotent Producer.