Exactly-Once 与事务:跨 partition 的原子写入
前面在第 04 篇解决了单 partition 内的消息去重。这一篇解决更难的问题:一批消息写到多个 partition,要么全部可见,要么全部不可见。
幂等 Producer 的序列号去重是 per-PID、per-partition 的。它不解决两个场景:一是 Producer 重启后 PID 改变导致无法关联之前的写入;二是一个业务操作涉及多个 partition 的写入,部分成功部分失败时没有回滚机制。Kafka 的事务机制(Transactional API)正是为了解决这两个问题而设计的。
本文只抓一个问题:Kafka 事务是怎么通过 transactional.id、Transaction Coordinator 和两阶段提交协议实现跨 partition 原子写入的。
为什么幂等不够
用一个具体场景说明。一个 stream processing 应用从 input topic 消费消息,处理后写入 output topic,同时提交 consumer offset。这三个动作涉及不同的 partition:
1 | |
如果写入 output-topic 成功但提交 offset 失败,应用重启后会重新消费已处理过的消息,导致 output-topic 中出现重复。幂等 Producer 无法解决这个问题,因为去重是 per-partition 的,它不知道 output-topic 和 __consumer_offsets 之间的关联。
需要一个机制把"写入 output-topic"和"提交 consumer offset"绑定在同一个原子操作里——这就是事务。
transactional.id:跨重启的稳定身份
事务 Producer 在配置中指定一个 transactional.id:
1 | |
与幂等 Producer 的 PID 不同,transactional.id 是由应用自己定义的字符串,不随进程重启而改变。broker 通过 transactional.id 维护一个持久化的映射关系:
1 | |
当一个新 Producer 实例使用相同的 transactional.id 启动时:
- broker 为这个 transactional.id 分配一个新的 epoch(纪元号递增)。
- 旧 epoch 的任何未完成事务被 abort。
- 旧 epoch 的 Producer 实例如果还在运行,它的后续写入请求会被 broker 拒绝(ProducerFencedException)。
这个 fencing 机制解决了"僵尸 Producer"问题:旧进程还没完全死掉就来了新进程,两个 Producer 不会同时写入。epoch 更高的那个赢。
Transaction Coordinator
每个 transactional.id 被映射到一个 Transaction Coordinator——一个特定的 broker。映射方式和 consumer group coordinator 类似:对 transactional.id 做 hash,取模后定位到 __transaction_state 内部 topic 的某个 partition,该 partition 的 leader 就是 coordinator。
__transaction_state 是 Kafka 的内部 topic,默认 50 个 partition,replication factor 为 3。它记录每个事务的状态变迁:
1 | |
Transaction Coordinator 既是事务状态的管理者,也是两阶段提交的协调者。
事务生命周期
一个完整的事务经过以下步骤:
1 | |
几个关键点:
beginTransaction() 是纯本地操作,不涉及网络通信。事务的网络开销集中在 initTransactions()(一次)和 commitTransaction/abortTransaction(每次事务)。
send() 在事务中和普通发送几乎一样,唯一的区别是第一次向一个新 partition 发送时,Producer 会先通过 AddPartitionsToTxnRequest 告诉 Coordinator"这个 partition 加入当前事务"。
sendOffsetsToTransaction() 把 consumer offset 的提交绑入事务。它不是直接写 __consumer_offsets,而是告诉 Coordinator"把这些 offset 关联到当前事务"。offset 的实际写入在事务提交时由 Coordinator 完成。
两阶段提交
commitTransaction() 触发两阶段提交:
Phase 1 — PREPARE_COMMIT:Coordinator 把事务状态从 ONGOING 改为 PREPARE_COMMIT,写入 __transaction_state。这是 commit point——一旦 PREPARE_COMMIT 成功写入并复制到 ISR,事务就不可撤销了。即使 Coordinator 在此之后崩溃,新的 Coordinator 恢复后也会继续完成提交。
Phase 2 — WriteTxnMarkers:Coordinator 向事务涉及的每个 partition 的 leader 发送 WriteTxnMarkersRequest。每个 partition leader 在日志中追加一条控制消息(COMMIT 或 ABORT marker)。这些 marker 是普通的日志记录,占据一个 offset,但对应用层不可见。
当所有 partition 都确认 marker 写入后,Coordinator 把状态改为 COMPLETE_COMMIT,事务结束。
abortTransaction() 的流程相同,只是 marker 类型是 ABORT 而不是 COMMIT。
Consumer 端:isolation.level
事务写入的消息在 commit 之前就已经存在于 partition 日志中了——它们和普通消息一样被追加到日志末尾。事务的原子性不是通过"延迟写入"实现的,而是通过 consumer 端的过滤实现的。
Consumer 的 isolation.level 参数控制过滤行为:
read_uncommitted(默认)
Consumer 读取所有消息,包括未提交事务中的消息和已 abort 事务中的消息。行为和没有事务时一样。
read_committed
Consumer 只读取已提交事务中的消息和非事务消息。未提交的消息被跳过,已 abort 的消息被过滤。
read_committed 的实现依赖一个关键概念:Log Stable Offset (LSO)。
LSO:事务的水位线
LSO 是 partition 中最早的未完成事务的起始 offset。在 LSO 之前的所有消息要么属于已完成(committed 或 aborted)的事务,要么是非事务消息——它们的可见性已经确定。
1 | |
当存在一个长时间未提交的事务时,LSO 会卡在那个事务的起始 offset,导致 LSO 之后的所有消息(包括已提交事务的)都对 read_committed consumer 不可见。这是事务的一个性能陷阱:一个"挂起"的事务会阻塞整个 partition 的消费进度。
性能影响
事务的性能开销主要来自三个方面:
-
事务提交延迟:每次 commitTransaction() 涉及两阶段提交,包括 __transaction_state 的写入和向所有相关 partition 写入 marker。典型的事务提交延迟在 20-50ms(取决于涉及的 partition 数量和网络延迟)。
-
Coordinator 的 I/O:__transaction_state 的每次状态变更都是一次日志追加和复制。高频事务会增加 Coordinator 所在 broker 的 I/O 压力。
-
Consumer 端的过滤开销:read_committed consumer 需要维护一个 aborted transaction 的索引(.txnindex 文件),用于过滤已中止事务的消息。
减轻性能影响的常用策略是增大事务的批次:不要每条消息一个事务,而是在一个事务中处理一批消息。典型的做法是按时间窗口(如每 100ms)或消息数量(如每 200 条)提交一次事务。
实验:跨 partition 事务写入
下面的代码演示一个事务 Producer 向 3 个 partition 写入消息,然后用 read_committed consumer 验证原子性。
1 | |
1 | |
预期输出:Consumer 只收到 txn1-p0、txn1-p1、txn1-p2 三条消息。txn2 的三条消息虽然已经写入了 partition 日志,但因为事务被 abort,read_committed consumer 将它们过滤掉了。
如果把 isolation.level 改为 read_uncommitted(或不设置),Consumer 会收到全部 6 条消息。
模式提炼
Kafka 的事务是 two-phase commit over a log 模式的一个实例。和传统数据库的 2PC 相比,Kafka 的事务有一个关键简化:所有参与者(partition leader)和协调者(Transaction Coordinator)都使用相同的持久化机制——追加日志。
传统 2PC 的难点在于协调者崩溃后的恢复。Kafka 用内部 topic(__transaction_state)作为协调者的持久化存储,这个 topic 本身有副本和 ISR 机制,协调者崩溃后新 leader 可以从日志中完整恢复事务状态。
这个模式在其他系统中的变体:
- 数据库 2PC / XA:协调者(Transaction Manager)写 prepare log,参与者(Resource Manager)写 redo log。恢复依赖 prepare log 的持久化。
- RocketMQ 事务消息:用半消息(half message)+ 本地事务表 + 回查机制实现。不是严格的 2PC,而是基于补偿的最终一致性。
- Saga 模式:完全不用 2PC,通过一系列本地事务 + 补偿操作实现最终一致性。适用于跨服务场景。
Kafka 事务的适用范围是单个 Kafka 集群内的跨 partition 原子写入。它不是通用的分布式事务方案——不能把 Kafka 写入和数据库写入放在同一个 Kafka 事务里。
工程迁移表
| 概念 | Kafka 事务 | RocketMQ 事务消息 | 数据库分布式事务 |
|---|---|---|---|
| 原子性范围 | 单集群内跨 partition | 单条消息 + 本地事务 | 跨数据库 (XA / Saga) |
| 协调者 | Transaction Coordinator (broker) | 半消息 + 回查服务 | Transaction Manager ™ |
| 持久化 | __transaction_state (内部 topic) | 半消息 topic + 本地事务表 | redo log + prepare log |
| 提交协议 | 两阶段提交 (prepare + marker) | 半消息确认 + 本地提交 | 2PC / 3PC / Saga |
| 隔离级别 | read_committed / read_uncommitted | 无原生隔离级别 | READ_COMMITTED / SERIALIZABLE |
| 僵尸防护 | epoch fencing (ProducerFencedException) | 回查超时后回滚 | 超时回滚 |
| 性能影响 | 提交延迟 20-50ms | 半消息额外存储 + 回查开销 | 2PC 锁持有时间长 |
常见误解
误解一:exactly-once 意味着每条消息端到端只被处理一次。
事实:Kafka 的 exactly-once 语义(EOS)指的是 Producer 到 broker 的写入不重复,以及 consume-transform-produce 模式下通过事务绑定 offset 提交和输出写入实现的处理语义。它不覆盖外部系统的副作用——如果处理逻辑中调用了 HTTP API 或写了外部数据库,这些副作用不在 Kafka 事务的保护范围内。
误解二:事务会严重影响吞吐量。
事实:事务的主要开销是 commitTransaction() 的延迟(20-50ms),不是单条消息的吞吐。通过增大事务批次(每个事务处理更多消息),可以把事务提交的开销摊薄到每条消息上。每 100ms 提交一次事务,每秒只需 10 次事务提交,对整体吞吐影响很小。
误解三:transactional.id 只是为了跨重启去重。
事实:transactional.id 的核心作用是 zombie fencing。当新 Producer 实例使用相同的 transactional.id 启动时,旧实例被 fence 掉(epoch 递增),它的未完成事务被 abort。这保证了同一个 transactional.id 在任何时刻只有一个活跃的 Producer 实例。跨重启去重是这个 fencing 机制的副产品。
练习
-
用 Docker Compose 搭建一个 3-broker 集群。创建一个 3-partition topic。运行上面的事务 Producer 代码,分别用 read_committed 和 read_uncommitted consumer 消费,对比收到的消息。
-
在事务 Producer 发送过程中(beginTransaction 之后、commitTransaction 之前)kill Producer 进程。等待
transaction.timeout.ms(默认 60 秒)后,用 read_committed consumer 检查消息是否可见。观察超时后事务被自动 abort 的行为。 -
启动两个使用相同 transactional.id 的 Producer。观察后启动的 Producer 是否能正常工作,先启动的 Producer 的下一次 send 是否抛出 ProducerFencedException。
-
写一个 consume-transform-produce 的 stream processor:从 input-topic 消费,处理后写入 output-topic,在同一个事务中提交 consumer offset(使用 sendOffsetsToTransaction)。在处理过程中 kill 进程并重启,验证 output-topic 中不会出现重复消息。
系列导航
| 序号 | 主题 |
|---|---|
| 00 | 导读:为什么 Kafka 的核心是一根日志 |
| 01 | 架构总览:Broker、Controller 与元数据管理 |
| 02 | 日志存储:Segment、Index 与零拷贝 |
| 03 | Producer 内部机制:攒批、分区与 acks |
| 04 | 幂等 Producer 与序列号:消息不重不丢的第一层 |
| 05 | Consumer Group 协议:分配、重平衡与静态成员 |
| 06 | Offset 管理:提交、重置与消费语义 |
| 07 | 副本与 ISR:高可用的代价和折中 |
| 08 | Controller 与 KRaft:从 ZooKeeper 到内置共识 |
| 09 | Exactly-Once 与事务:跨 partition 的原子写入 |
| 10 | 日志压缩:把 topic 当 KV 表用 |
| 11 | Kafka Streams:在日志之上构建流处理 |
| 12 | Kafka Connect:标准化的数据管道 |
| 13 | Schema Registry 与数据治理:给消息加上契约 |
| 14 | 性能模型:吞吐、延迟与调优思路 |
| 15 | 安全体系:认证、授权与加密 |
| 16 | 生产运维:集群扩缩、监控指标与故障排查 |
| 17 | Kafka 与 RocketMQ:两种消息系统的设计选择 |
参考资料
- KIP-98: Exactly Once Delivery and Transactional Messaging:https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
- Apache Kafka 官方文档 — Transactions:https://kafka.apache.org/documentation/#semantics
- Confluent Blog — Transactions in Apache Kafka:https://www.confluent.io/blog/transactions-apache-kafka/
- Kafka 源码 — TransactionCoordinator.scala:https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
- Guozhang Wang. Erta-Once Semantics in Apache Kafka. Confluent, 2017:https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
