Producer 内部机制:攒批、分区与 acks
上一篇拆解了 partition 内部的存储结构。这一篇转向写入端——Producer 把一条消息发出去,到底经过了哪些步骤。
很多使用者把 producer.send() 当成一次网络调用。实际情况是,send() 只是把消息丢进了一个内存缓冲区,真正的网络 I/O 发生在另一个线程里。Producer 内部是一条两阶段管线:主线程负责序列化和分区选择,后台 Sender 线程负责攒批和网络传输。
本文只抓一个问题:一条消息从 send() 调用到 broker 确认,经过了哪些对象、哪些线程、哪些等待。
发送管线总览
Producer 内部的数据流可以压成下面的路径:
1 | |
主线程和 Sender 线程之间的交汇点是 RecordAccumulator。理解这条管线的关键在于这个中间缓冲区。
RecordAccumulator:按分区攒批
RecordAccumulator 内部为每个 TopicPartition 维护一个双端队列(Deque),队列中的元素是 ProducerBatch。一个 ProducerBatch 对应一段连续的 ByteBuffer,写满或超时后交给 Sender 线程。
两个参数控制攒批的时机:
batch.size:单个 ProducerBatch 的目标字节数,默认 16384(16 KB)。当一个 batch 累积到这个大小时,标记为 ready。linger.ms:即使 batch 没写满,等待这么长时间后也标记为 ready。默认 0,即来一条发一条。
两者是 OR 关系——任何一个条件先满足,batch 就会被发送。
这和 TCP 里的 Nagle 算法思路一致:小包攒一攒再发,用少量延迟换吞吐。区别在于 Nagle 是在传输层做的,而 Kafka 的攒批发生在应用层,开发者对攒批窗口有完整控制权。
当 linger.ms 设为 0 时,RecordAccumulator 退化为一个直通缓冲:消息进来就尝试唤醒 Sender 线程。生产环境通常会把 linger.ms 设成 5 到 20 毫秒,让同一时间窗口内的消息合成更少的网络请求。
内存管理
RecordAccumulator 的总内存由 buffer.memory 控制,默认 32 MB。当所有 batch 占满了这块内存,send() 调用会阻塞,直到有 batch 被发送释放空间或超过 max.block.ms(默认 60 秒)抛出 TimeoutException。
这个设计意味着 Producer 有反压能力:下游 broker 处理不过来时,send() 自然变慢,不会无限消耗客户端内存。
Partitioner:消息去哪个分区
在进入 RecordAccumulator 之前,每条消息需要确定目标分区。KafkaProducer 按以下规则选择:
- 如果 ProducerRecord 指定了 partition 字段,直接使用。
- 如果提供了 key,对 key 做 murmur2 hash 再对分区数取模。相同 key 的消息始终落入同一个分区,保证了 key 级别的顺序性。
- 如果既没有 partition 也没有 key,使用 sticky partitioning。
Sticky Partitioning(KIP-480)
早期版本(2.3 之前)对无 key 消息使用 round-robin,每条消息换一个分区。这导致每个分区的 batch 都积累很慢——消息被均匀打散到所有分区,每个 batch 需要更长时间才能填满。
KIP-480 引入 sticky partitioning:在一个 batch 填满之前,所有无 key 消息都粘在同一个分区上。batch 发送后再切换到下一个分区。这样每个 batch 更快被填满,减少了 linger.ms 等待,降低了端到端延迟。
直观理解:round-robin 是"每条消息换个分区",sticky 是"每个 batch 换个分区"。
acks:多少 broker 确认才算写入成功
Sender 线程把 ProducerBatch 发送到 partition 的 leader broker 后,需要等待确认。acks 参数控制确认级别:
acks=0
Producer 发出去就认为成功,不等任何确认。最低延迟,但消息可能因为网络丢包或 broker 崩溃而丢失。适用于指标采集、日志等允许少量丢失的场景。
acks=1
Leader broker 把消息写入本地日志后就回复确认。如果 leader 在 follower 同步之前崩溃,消息会丢失。这是多数场景的默认选择(自 Kafka 3.0 起默认值从 1 改为 all)。
acks=all(或 acks=-1)
Leader 等待 ISR(In-Sync Replicas)中所有副本都写入成功后才回复确认。这是最安全的选项,但延迟最高。
acks=all 并不意味着集群中所有 broker 都要确认——只有 ISR 中的副本需要确认。ISR 的大小由 min.insync.replicas 控制。当 ISR 中的副本数少于 min.insync.replicas 时,broker 拒绝写入并返回 NotEnoughReplicasException。
一个典型的高可用配置:replication.factor=3,min.insync.replicas=2,acks=all。这保证至少有两个副本写入成功,允许一个 broker 下线。
重试与顺序性
网络不稳定时 Producer 会自动重试。retries 参数控制最大重试次数(自 Kafka 2.1 起默认值为 Integer.MAX_VALUE,配合 delivery.timeout.ms 使用)。
重试引入一个微妙的顺序问题。假设 Producer 发出两个 batch:batch-1 和 batch-2。如果 batch-1 失败需要重试,而 batch-2 成功了,那么日志中 batch-2 的消息在 batch-1 之前——顺序被打乱。
max.in.flight.requests.per.connection 控制 Sender 线程向同一个 broker 最多同时发出多少个未确认请求。默认值为 5。当这个值大于 1 时,重试可能导致乱序。
解决方案有两种:
- 把 max.in.flight.requests.per.connection 设为 1。同一时刻只有一个请求在飞,重试不会乱序,但吞吐量下降。
- 开启幂等 Producer(enable.idempotence=true)。幂等模式下 Kafka 允许最多 5 个 in-flight 请求,同时通过序列号在 broker 端保证顺序和去重。下一篇会展开这个机制。
Callback 与 Future
send() 返回一个 Future<RecordMetadata>。两种使用方式:
异步回调:
1 | |
同步等待:
1 | |
同步方式相当于把 Producer 的两阶段管线退化为单步操作——每发一条消息就阻塞等结果,吞吐量大幅下降。除了少数需要严格确认的场景(如金融交易流水),通常使用异步回调。
实验:acks 对延迟和吞吐的影响
下面的 Java 代码向一个 3 副本 topic 发送 10000 条 1 KB 消息,分别使用 acks=1 和 acks=all,对比延迟和吞吐。
1 | |
典型结果(3-broker 集群,同机房):
1 | |
acks=all 的延迟大约是 acks=1 的 1.5 到 2 倍。额外延迟来自 leader 等待 follower 确认的网络往返。在跨机房部署中这个差距会更明显。
把 linger.ms 从 10 调到 0,总耗时会显著增加——更多的小 batch 意味着更多的网络请求。攒批的效果在高吞吐场景下非常明显。
模式提炼
Producer 的攒批机制是 write-ahead batching 模式的一个实例。这个模式在系统设计中反复出现:
- TCP Nagle 算法:小包在发送缓冲区等待,积累到 MSS 或收到前一个包的 ACK 后才发出。
- 数据库 group commit:多个事务的 WAL 写入合并成一次 fsync,减少磁盘 I/O。
- gRPC 客户端:多个 RPC 请求合并到同一个 HTTP/2 连接的不同 stream 上。
模式的核心是:在生产者和传输层之间插入一个缓冲区,用可控的等待时间换取更少的传输次数。Kafka 的 RecordAccumulator 就是这个缓冲区,linger.ms 就是等待窗口。
工程迁移表
| 概念 | Kafka Producer | RocketMQ Producer | 数据库写入路径 |
|---|---|---|---|
| 攒批缓冲 | RecordAccumulator (batch.size + linger.ms) | 异步发送时客户端攒批 | WAL buffer / redo log buffer |
| 分区选择 | Partitioner (hash / sticky / custom) | MessageQueueSelector | 分库分表路由 |
| 确认级别 | acks=0/1/all | 同步/异步/oneway | fsync 策略 (每事务 / 每秒 / 不 sync) |
| 重试机制 | retries + delivery.timeout.ms | retryTimesWhenSendFailed | 应用层重试 |
| 顺序保证 | max.in.flight=1 或 idempotence | orderly message + 单队列 | 单连接串行提交 |
| 异步回调 | Callback / Future | SendCallback | CompletableFuture (异步 JDBC) |
常见误解
误解一:acks=all 意味着集群所有 broker 都要确认。
事实:acks=all 只要求 ISR 中的副本确认。ISR 可能只有 leader 自己(如果其他副本落后太多被踢出 ISR)。需要配合 min.insync.replicas 才能保证多副本确认。
误解二:send() 是同步网络调用。
事实:send() 只是把消息追加到 RecordAccumulator 的内存 buffer,真正的网络 I/O 由 Sender 线程异步执行。send() 在 buffer 满时会阻塞等空间,但这是背压,不是网络等待。
误解三:linger.ms=0 性能最好。
事实:linger.ms=0 意味着每条消息都可能触发一次网络请求。在高吞吐场景下,适当增大 linger.ms(5-20ms)可以显著提高吞吐量,因为更大的 batch 意味着更少的网络往返和更好的压缩率。
练习
-
启动一个 3-broker 集群,创建一个 3 分区、3 副本的 topic。用上面的实验代码分别测试 acks=0、acks=1、acks=all 的延迟差异。然后停掉一个 broker,观察 acks=all + min.insync.replicas=2 时的行为变化。
-
修改实验代码,把 linger.ms 分别设为 0、5、50、200,发送 100000 条消息,记录总耗时和平均 batch 大小。画一张 linger.ms vs throughput 的曲线。
-
写一个自定义 Partitioner,把同一个 orderId 的消息路由到同一个分区。验证消费端能按 orderId 看到有序消息。
-
把 max.in.flight.requests.per.connection 设为 1 和 5,分别测试不开启幂等时的吞吐差异。然后开启 enable.idempotence=true,观察 max.in.flight=5 时是否仍能保证顺序。
系列导航
| 序号 | 主题 |
|---|---|
| 00 | 导读:为什么 Kafka 的核心是一根日志 |
| 01 | 架构总览:Broker、Controller 与元数据管理 |
| 02 | 日志存储:Segment、Index 与零拷贝 |
| 03 | Producer 内部机制:攒批、分区与 acks |
| 04 | 幂等 Producer 与序列号:消息不重不丢的第一层 |
| 05 | Consumer Group 协议:分配、重平衡与静态成员 |
| 06 | Offset 管理:提交、重置与消费语义 |
| 07 | 副本与 ISR:高可用的代价和折中 |
| 08 | Controller 与 KRaft:从 ZooKeeper 到内置共识 |
| 09 | Exactly-Once 与事务:跨 partition 的原子写入 |
| 10 | 日志压缩:把 topic 当 KV 表用 |
| 11 | Kafka Streams:在日志之上构建流处理 |
| 12 | Kafka Connect:标准化的数据管道 |
| 13 | Schema Registry 与数据治理:给消息加上契约 |
| 14 | 性能模型:吞吐、延迟与调优思路 |
| 15 | 安全体系:认证、授权与加密 |
| 16 | 生产运维:集群扩缩、监控指标与故障排查 |
| 17 | Kafka 与 RocketMQ:两种消息系统的设计选择 |
参考资料
- Apache Kafka 官方文档 — Producer Configs:https://kafka.apache.org/documentation/#producerconfigs
- KIP-480: Sticky Partitioning:https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
- Kafka 源码 — RecordAccumulator.java:https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
- Neha Narkhede, Gwen Shapira, Todd Palino. Kafka: The Definitive Guide. O’Reilly, Chapter 3: Kafka Producers.
- KIP-91: Default acks=all:https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer
