前面在第 04 篇解决了单 partition 内的消息去重。这一篇解决更难的问题:一批消息写到多个 partition,要么全部可见,要么全部不可见。

幂等 Producer 的序列号去重是 per-PID、per-partition 的。它不解决两个场景:一是 Producer 重启后 PID 改变导致无法关联之前的写入;二是一个业务操作涉及多个 partition 的写入,部分成功部分失败时没有回滚机制。Kafka 的事务机制(Transactional API)正是为了解决这两个问题而设计的。

本文只抓一个问题:Kafka 事务是怎么通过 transactional.id、Transaction Coordinator 和两阶段提交协议实现跨 partition 原子写入的。

为什么幂等不够

用一个具体场景说明。一个 stream processing 应用从 input topic 消费消息,处理后写入 output topic,同时提交 consumer offset。这三个动作涉及不同的 partition:

1
2
input-topic (partition-0)  --->  处理  --->  output-topic (partition-2)
+-> __consumer_offsets (partition-5)

如果写入 output-topic 成功但提交 offset 失败,应用重启后会重新消费已处理过的消息,导致 output-topic 中出现重复。幂等 Producer 无法解决这个问题,因为去重是 per-partition 的,它不知道 output-topic 和 __consumer_offsets 之间的关联。

需要一个机制把"写入 output-topic"和"提交 consumer offset"绑定在同一个原子操作里——这就是事务。

transactional.id:跨重启的稳定身份

事务 Producer 在配置中指定一个 transactional.id

1
props.put("transactional.id", "order-processing-001");

与幂等 Producer 的 PID 不同,transactional.id 是由应用自己定义的字符串,不随进程重启而改变。broker 通过 transactional.id 维护一个持久化的映射关系:

1
transactional.id -> (PID, epoch)

当一个新 Producer 实例使用相同的 transactional.id 启动时:

  1. broker 为这个 transactional.id 分配一个新的 epoch(纪元号递增)。
  2. 旧 epoch 的任何未完成事务被 abort。
  3. 旧 epoch 的 Producer 实例如果还在运行,它的后续写入请求会被 broker 拒绝(ProducerFencedException)。

这个 fencing 机制解决了"僵尸 Producer"问题:旧进程还没完全死掉就来了新进程,两个 Producer 不会同时写入。epoch 更高的那个赢。

Transaction Coordinator

每个 transactional.id 被映射到一个 Transaction Coordinator——一个特定的 broker。映射方式和 consumer group coordinator 类似:对 transactional.id 做 hash,取模后定位到 __transaction_state 内部 topic 的某个 partition,该 partition 的 leader 就是 coordinator。

__transaction_state 是 Kafka 的内部 topic,默认 50 个 partition,replication factor 为 3。它记录每个事务的状态变迁:

1
2
3
4
__transaction_state (partition-N)
├── txn-id="order-001": state=ONGOING, PID=1001, epoch=3, partitions=[topic-a-0, topic-b-1]
├── txn-id="order-002": state=PREPARE_COMMIT, PID=1002, epoch=1, partitions=[topic-c-2]
└── txn-id="order-003": state=COMPLETE_COMMIT, PID=1003, epoch=0, partitions=[topic-a-1]

Transaction Coordinator 既是事务状态的管理者,也是两阶段提交的协调者。

事务生命周期

一个完整的事务经过以下步骤:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Producer                    Txn Coordinator              Partition Leaders
| | |
|-- initTransactions() ------->| |
| (获取 PID, epoch, | |
| abort 旧事务) | |
| | |
|-- beginTransaction() | |
| (本地标记, 不涉及网络) | |
| | |
|-- send(record to P-0) ------>|-- AddPartitionsToTxn ------->|
| | (注册 P-0 到当前事务) |
|-- send(record to P-1) ------>|-- AddPartitionsToTxn ------->|
| | (注册 P-1 到当前事务) |
| | |
|-- sendOffsetsToTxn() ------->|-- AddOffsetsToTxn ---------->|
| (把 consumer offset | (注册 __consumer_offsets |
| 绑入事务) | partition 到事务) |
| | |
|-- commitTransaction() ------>| |
| |-- Phase 1: PREPARE_COMMIT -->|
| | (写入 __transaction_state) |
| | |
| |-- Phase 2: WriteTxnMarkers ->|
| | (向每个 partition 写入 |
| | COMMIT marker) |
| | |
| |-- COMPLETE_COMMIT |
| | (最终状态, 写入 |
|<-- 返回成功 ------------------| __transaction_state) |

几个关键点:

beginTransaction() 是纯本地操作,不涉及网络通信。事务的网络开销集中在 initTransactions()(一次)和 commitTransaction/abortTransaction(每次事务)。

send() 在事务中和普通发送几乎一样,唯一的区别是第一次向一个新 partition 发送时,Producer 会先通过 AddPartitionsToTxnRequest 告诉 Coordinator"这个 partition 加入当前事务"。

sendOffsetsToTransaction() 把 consumer offset 的提交绑入事务。它不是直接写 __consumer_offsets,而是告诉 Coordinator"把这些 offset 关联到当前事务"。offset 的实际写入在事务提交时由 Coordinator 完成。

两阶段提交

commitTransaction() 触发两阶段提交:

Phase 1 — PREPARE_COMMIT:Coordinator 把事务状态从 ONGOING 改为 PREPARE_COMMIT,写入 __transaction_state。这是 commit point——一旦 PREPARE_COMMIT 成功写入并复制到 ISR,事务就不可撤销了。即使 Coordinator 在此之后崩溃,新的 Coordinator 恢复后也会继续完成提交。

Phase 2 — WriteTxnMarkers:Coordinator 向事务涉及的每个 partition 的 leader 发送 WriteTxnMarkersRequest。每个 partition leader 在日志中追加一条控制消息(COMMIT 或 ABORT marker)。这些 marker 是普通的日志记录,占据一个 offset,但对应用层不可见。

当所有 partition 都确认 marker 写入后,Coordinator 把状态改为 COMPLETE_COMMIT,事务结束。

abortTransaction() 的流程相同,只是 marker 类型是 ABORT 而不是 COMMIT。

Consumer 端:isolation.level

事务写入的消息在 commit 之前就已经存在于 partition 日志中了——它们和普通消息一样被追加到日志末尾。事务的原子性不是通过"延迟写入"实现的,而是通过 consumer 端的过滤实现的。

Consumer 的 isolation.level 参数控制过滤行为:

read_uncommitted(默认)

Consumer 读取所有消息,包括未提交事务中的消息和已 abort 事务中的消息。行为和没有事务时一样。

read_committed

Consumer 只读取已提交事务中的消息和非事务消息。未提交的消息被跳过,已 abort 的消息被过滤。

read_committed 的实现依赖一个关键概念:Log Stable Offset (LSO)。

LSO:事务的水位线

LSO 是 partition 中最早的未完成事务的起始 offset。在 LSO 之前的所有消息要么属于已完成(committed 或 aborted)的事务,要么是非事务消息——它们的可见性已经确定。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Partition Log:

offset: 0 1 2 3 4 5 6 7 8 9 10 11 12
| | | | | | | | | | | | |
N T1 T1 N T2 T2 N T1 T1 C1 T2 A2 N
(commit) (abort)

N = 非事务消息
T1 = 事务 1 的消息 (已提交, offset 9 有 COMMIT marker)
T2 = 事务 2 的消息 (已中止, offset 11 有 ABORT marker)
C1 = 事务 1 的 COMMIT marker
A2 = 事务 2 的 ABORT marker

LSO = 13 (所有事务已完成)

read_committed consumer 看到: 0, 1, 2, 3, 6, 7, 8, 12
(N, T1, T1, N, N, T1, T1, N)
T2 的消息被过滤, marker 不可见

当存在一个长时间未提交的事务时,LSO 会卡在那个事务的起始 offset,导致 LSO 之后的所有消息(包括已提交事务的)都对 read_committed consumer 不可见。这是事务的一个性能陷阱:一个"挂起"的事务会阻塞整个 partition 的消费进度。

性能影响

事务的性能开销主要来自三个方面:

  1. 事务提交延迟:每次 commitTransaction() 涉及两阶段提交,包括 __transaction_state 的写入和向所有相关 partition 写入 marker。典型的事务提交延迟在 20-50ms(取决于涉及的 partition 数量和网络延迟)。

  2. Coordinator 的 I/O:__transaction_state 的每次状态变更都是一次日志追加和复制。高频事务会增加 Coordinator 所在 broker 的 I/O 压力。

  3. Consumer 端的过滤开销:read_committed consumer 需要维护一个 aborted transaction 的索引(.txnindex 文件),用于过滤已中止事务的消息。

减轻性能影响的常用策略是增大事务的批次:不要每条消息一个事务,而是在一个事务中处理一批消息。典型的做法是按时间窗口(如每 100ms)或消息数量(如每 200 条)提交一次事务。

实验:跨 partition 事务写入

下面的代码演示一个事务 Producer 向 3 个 partition 写入消息,然后用 read_committed consumer 验证原子性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// === 事务 Producer ===
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("transactional.id", "demo-txn-001");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

// 事务 1:正常提交
producer.beginTransaction();
producer.send(new ProducerRecord<>("txn-test", 0, "k1", "txn1-p0"));
producer.send(new ProducerRecord<>("txn-test", 1, "k2", "txn1-p1"));
producer.send(new ProducerRecord<>("txn-test", 2, "k3", "txn1-p2"));
producer.commitTransaction();
System.out.println("Transaction 1 committed");

// 事务 2:主动中止
producer.beginTransaction();
producer.send(new ProducerRecord<>("txn-test", 0, "k4", "txn2-p0"));
producer.send(new ProducerRecord<>("txn-test", 1, "k5", "txn2-p1"));
producer.send(new ProducerRecord<>("txn-test", 2, "k6", "txn2-p2"));
producer.abortTransaction();
System.out.println("Transaction 2 aborted");

producer.close();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// === read_committed Consumer ===
Properties cProps = new Properties();
cProps.put("bootstrap.servers", "localhost:9092");
cProps.put("group.id", "txn-check");
cProps.put("auto.offset.reset", "earliest");
cProps.put("isolation.level", "read_committed");
cProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
cProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(cProps);
consumer.subscribe(List.of("txn-test"));

List<String> received = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
if (records.isEmpty()) break;
for (ConsumerRecord<String, String> r : records) {
received.add(r.value());
System.out.printf("partition=%d offset=%d value=%s%n",
r.partition(), r.offset(), r.value());
}
}

System.out.println("Total received: " + received.size());
// 预期只有 txn1 的 3 条消息, txn2 的消息被过滤
consumer.close();

预期输出:Consumer 只收到 txn1-p0、txn1-p1、txn1-p2 三条消息。txn2 的三条消息虽然已经写入了 partition 日志,但因为事务被 abort,read_committed consumer 将它们过滤掉了。

如果把 isolation.level 改为 read_uncommitted(或不设置),Consumer 会收到全部 6 条消息。

模式提炼

Kafka 的事务是 two-phase commit over a log 模式的一个实例。和传统数据库的 2PC 相比,Kafka 的事务有一个关键简化:所有参与者(partition leader)和协调者(Transaction Coordinator)都使用相同的持久化机制——追加日志。

传统 2PC 的难点在于协调者崩溃后的恢复。Kafka 用内部 topic(__transaction_state)作为协调者的持久化存储,这个 topic 本身有副本和 ISR 机制,协调者崩溃后新 leader 可以从日志中完整恢复事务状态。

这个模式在其他系统中的变体:

  • 数据库 2PC / XA:协调者(Transaction Manager)写 prepare log,参与者(Resource Manager)写 redo log。恢复依赖 prepare log 的持久化。
  • RocketMQ 事务消息:用半消息(half message)+ 本地事务表 + 回查机制实现。不是严格的 2PC,而是基于补偿的最终一致性。
  • Saga 模式:完全不用 2PC,通过一系列本地事务 + 补偿操作实现最终一致性。适用于跨服务场景。

Kafka 事务的适用范围是单个 Kafka 集群内的跨 partition 原子写入。它不是通用的分布式事务方案——不能把 Kafka 写入和数据库写入放在同一个 Kafka 事务里。

工程迁移表

概念 Kafka 事务 RocketMQ 事务消息 数据库分布式事务
原子性范围 单集群内跨 partition 单条消息 + 本地事务 跨数据库 (XA / Saga)
协调者 Transaction Coordinator (broker) 半消息 + 回查服务 Transaction Manager ™
持久化 __transaction_state (内部 topic) 半消息 topic + 本地事务表 redo log + prepare log
提交协议 两阶段提交 (prepare + marker) 半消息确认 + 本地提交 2PC / 3PC / Saga
隔离级别 read_committed / read_uncommitted 无原生隔离级别 READ_COMMITTED / SERIALIZABLE
僵尸防护 epoch fencing (ProducerFencedException) 回查超时后回滚 超时回滚
性能影响 提交延迟 20-50ms 半消息额外存储 + 回查开销 2PC 锁持有时间长

常见误解

误解一:exactly-once 意味着每条消息端到端只被处理一次。
事实:Kafka 的 exactly-once 语义(EOS)指的是 Producer 到 broker 的写入不重复,以及 consume-transform-produce 模式下通过事务绑定 offset 提交和输出写入实现的处理语义。它不覆盖外部系统的副作用——如果处理逻辑中调用了 HTTP API 或写了外部数据库,这些副作用不在 Kafka 事务的保护范围内。

误解二:事务会严重影响吞吐量。
事实:事务的主要开销是 commitTransaction() 的延迟(20-50ms),不是单条消息的吞吐。通过增大事务批次(每个事务处理更多消息),可以把事务提交的开销摊薄到每条消息上。每 100ms 提交一次事务,每秒只需 10 次事务提交,对整体吞吐影响很小。

误解三:transactional.id 只是为了跨重启去重。
事实:transactional.id 的核心作用是 zombie fencing。当新 Producer 实例使用相同的 transactional.id 启动时,旧实例被 fence 掉(epoch 递增),它的未完成事务被 abort。这保证了同一个 transactional.id 在任何时刻只有一个活跃的 Producer 实例。跨重启去重是这个 fencing 机制的副产品。

练习

  1. 用 Docker Compose 搭建一个 3-broker 集群。创建一个 3-partition topic。运行上面的事务 Producer 代码,分别用 read_committed 和 read_uncommitted consumer 消费,对比收到的消息。

  2. 在事务 Producer 发送过程中(beginTransaction 之后、commitTransaction 之前)kill Producer 进程。等待 transaction.timeout.ms(默认 60 秒)后,用 read_committed consumer 检查消息是否可见。观察超时后事务被自动 abort 的行为。

  3. 启动两个使用相同 transactional.id 的 Producer。观察后启动的 Producer 是否能正常工作,先启动的 Producer 的下一次 send 是否抛出 ProducerFencedException。

  4. 写一个 consume-transform-produce 的 stream processor:从 input-topic 消费,处理后写入 output-topic,在同一个事务中提交 consumer offset(使用 sendOffsetsToTransaction)。在处理过程中 kill 进程并重启,验证 output-topic 中不会出现重复消息。

系列导航

序号 主题
00 导读:为什么 Kafka 的核心是一根日志
01 架构总览:Broker、Controller 与元数据管理
02 日志存储:Segment、Index 与零拷贝
03 Producer 内部机制:攒批、分区与 acks
04 幂等 Producer 与序列号:消息不重不丢的第一层
05 Consumer Group 协议:分配、重平衡与静态成员
06 Offset 管理:提交、重置与消费语义
07 副本与 ISR:高可用的代价和折中
08 Controller 与 KRaft:从 ZooKeeper 到内置共识
09 Exactly-Once 与事务:跨 partition 的原子写入
10 日志压缩:把 topic 当 KV 表用
11 Kafka Streams:在日志之上构建流处理
12 Kafka Connect:标准化的数据管道
13 Schema Registry 与数据治理:给消息加上契约
14 性能模型:吞吐、延迟与调优思路
15 安全体系:认证、授权与加密
16 生产运维:集群扩缩、监控指标与故障排查
17 Kafka 与 RocketMQ:两种消息系统的设计选择

参考资料

  1. KIP-98: Exactly Once Delivery and Transactional Messaging:https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
  2. Apache Kafka 官方文档 — Transactions:https://kafka.apache.org/documentation/#semantics
  3. Confluent Blog — Transactions in Apache Kafka:https://www.confluent.io/blog/transactions-apache-kafka/
  4. Kafka 源码 — TransactionCoordinator.scala:https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
  5. Guozhang Wang. Erta-Once Semantics in Apache Kafka. Confluent, 2017:https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/