上一篇拆解了 partition 内部的存储结构。这一篇转向写入端——Producer 把一条消息发出去,到底经过了哪些步骤。

很多使用者把 producer.send() 当成一次网络调用。实际情况是,send() 只是把消息丢进了一个内存缓冲区,真正的网络 I/O 发生在另一个线程里。Producer 内部是一条两阶段管线:主线程负责序列化和分区选择,后台 Sender 线程负责攒批和网络传输。

本文只抓一个问题:一条消息从 send() 调用到 broker 确认,经过了哪些对象、哪些线程、哪些等待。

发送管线总览

Producer 内部的数据流可以压成下面的路径:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Main Thread                         Sender Thread
| |
v |
Interceptors |
| |
v |
Serializer (key + value) |
| |
v |
Partitioner (选分区) |
| |
v |
RecordAccumulator |
| (按 TopicPartition 攒批) |
| |
+------------- batch ready --------->|
v
drain batches
|
v
NetworkClient
|
v
Broker
|
v
ProduceResponse
|
v
Callback / Future

主线程和 Sender 线程之间的交汇点是 RecordAccumulator。理解这条管线的关键在于这个中间缓冲区。

RecordAccumulator:按分区攒批

RecordAccumulator 内部为每个 TopicPartition 维护一个双端队列(Deque),队列中的元素是 ProducerBatch。一个 ProducerBatch 对应一段连续的 ByteBuffer,写满或超时后交给 Sender 线程。

两个参数控制攒批的时机:

  • batch.size:单个 ProducerBatch 的目标字节数,默认 16384(16 KB)。当一个 batch 累积到这个大小时,标记为 ready。
  • linger.ms:即使 batch 没写满,等待这么长时间后也标记为 ready。默认 0,即来一条发一条。

两者是 OR 关系——任何一个条件先满足,batch 就会被发送。

这和 TCP 里的 Nagle 算法思路一致:小包攒一攒再发,用少量延迟换吞吐。区别在于 Nagle 是在传输层做的,而 Kafka 的攒批发生在应用层,开发者对攒批窗口有完整控制权。

linger.ms 设为 0 时,RecordAccumulator 退化为一个直通缓冲:消息进来就尝试唤醒 Sender 线程。生产环境通常会把 linger.ms 设成 5 到 20 毫秒,让同一时间窗口内的消息合成更少的网络请求。

内存管理

RecordAccumulator 的总内存由 buffer.memory 控制,默认 32 MB。当所有 batch 占满了这块内存,send() 调用会阻塞,直到有 batch 被发送释放空间或超过 max.block.ms(默认 60 秒)抛出 TimeoutException。

这个设计意味着 Producer 有反压能力:下游 broker 处理不过来时,send() 自然变慢,不会无限消耗客户端内存。

Partitioner:消息去哪个分区

在进入 RecordAccumulator 之前,每条消息需要确定目标分区。KafkaProducer 按以下规则选择:

  1. 如果 ProducerRecord 指定了 partition 字段,直接使用。
  2. 如果提供了 key,对 key 做 murmur2 hash 再对分区数取模。相同 key 的消息始终落入同一个分区,保证了 key 级别的顺序性。
  3. 如果既没有 partition 也没有 key,使用 sticky partitioning。

Sticky Partitioning(KIP-480)

早期版本(2.3 之前)对无 key 消息使用 round-robin,每条消息换一个分区。这导致每个分区的 batch 都积累很慢——消息被均匀打散到所有分区,每个 batch 需要更长时间才能填满。

KIP-480 引入 sticky partitioning:在一个 batch 填满之前,所有无 key 消息都粘在同一个分区上。batch 发送后再切换到下一个分区。这样每个 batch 更快被填满,减少了 linger.ms 等待,降低了端到端延迟。

直观理解:round-robin 是"每条消息换个分区",sticky 是"每个 batch 换个分区"。

acks:多少 broker 确认才算写入成功

Sender 线程把 ProducerBatch 发送到 partition 的 leader broker 后,需要等待确认。acks 参数控制确认级别:

acks=0

Producer 发出去就认为成功,不等任何确认。最低延迟,但消息可能因为网络丢包或 broker 崩溃而丢失。适用于指标采集、日志等允许少量丢失的场景。

acks=1

Leader broker 把消息写入本地日志后就回复确认。如果 leader 在 follower 同步之前崩溃,消息会丢失。这是多数场景的默认选择(自 Kafka 3.0 起默认值从 1 改为 all)。

acks=all(或 acks=-1)

Leader 等待 ISR(In-Sync Replicas)中所有副本都写入成功后才回复确认。这是最安全的选项,但延迟最高。

acks=all 并不意味着集群中所有 broker 都要确认——只有 ISR 中的副本需要确认。ISR 的大小由 min.insync.replicas 控制。当 ISR 中的副本数少于 min.insync.replicas 时,broker 拒绝写入并返回 NotEnoughReplicasException。

一个典型的高可用配置:replication.factor=3,min.insync.replicas=2,acks=all。这保证至少有两个副本写入成功,允许一个 broker 下线。

重试与顺序性

网络不稳定时 Producer 会自动重试。retries 参数控制最大重试次数(自 Kafka 2.1 起默认值为 Integer.MAX_VALUE,配合 delivery.timeout.ms 使用)。

重试引入一个微妙的顺序问题。假设 Producer 发出两个 batch:batch-1 和 batch-2。如果 batch-1 失败需要重试,而 batch-2 成功了,那么日志中 batch-2 的消息在 batch-1 之前——顺序被打乱。

max.in.flight.requests.per.connection 控制 Sender 线程向同一个 broker 最多同时发出多少个未确认请求。默认值为 5。当这个值大于 1 时,重试可能导致乱序。

解决方案有两种:

  • 把 max.in.flight.requests.per.connection 设为 1。同一时刻只有一个请求在飞,重试不会乱序,但吞吐量下降。
  • 开启幂等 Producer(enable.idempotence=true)。幂等模式下 Kafka 允许最多 5 个 in-flight 请求,同时通过序列号在 broker 端保证顺序和去重。下一篇会展开这个机制。

Callback 与 Future

send() 返回一个 Future<RecordMetadata>。两种使用方式:

异步回调:

1
2
3
4
5
6
7
8
9
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 处理发送失败
logger.error("Send failed for {}", record.key(), exception);
} else {
logger.debug("Sent to {}-{} offset {}",
metadata.topic(), metadata.partition(), metadata.offset());
}
});

同步等待:

1
RecordMetadata metadata = producer.send(record).get();

同步方式相当于把 Producer 的两阶段管线退化为单步操作——每发一条消息就阻塞等结果,吞吐量大幅下降。除了少数需要严格确认的场景(如金融交易流水),通常使用异步回调。

实验:acks 对延迟和吞吐的影响

下面的 Java 代码向一个 3 副本 topic 发送 10000 条 1 KB 消息,分别使用 acks=1 和 acks=all,对比延迟和吞吐。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("linger.ms", "10");
props.put("batch.size", "32768");

// 切换 acks 参数:acks=1 或 acks=all
props.put("acks", "all");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

String payload = "x".repeat(1024); // 1 KB 消息
int total = 10000;
long start = System.currentTimeMillis();
CountDownLatch latch = new CountDownLatch(total);

for (int i = 0; i < total; i++) {
producer.send(new ProducerRecord<>("perf-test", payload), (meta, ex) -> {
latch.countDown();
});
}

latch.await();
long elapsed = System.currentTimeMillis() - start;
System.out.printf("acks=%s total=%d elapsed=%dms throughput=%.0f msg/s%n",
props.get("acks"), total, elapsed, total * 1000.0 / elapsed);

producer.close();

典型结果(3-broker 集群,同机房):

1
2
acks=1    total=10000  elapsed=820ms   throughput=12195 msg/s
acks=all total=10000 elapsed=1380ms throughput=7246 msg/s

acks=all 的延迟大约是 acks=1 的 1.5 到 2 倍。额外延迟来自 leader 等待 follower 确认的网络往返。在跨机房部署中这个差距会更明显。

linger.ms 从 10 调到 0,总耗时会显著增加——更多的小 batch 意味着更多的网络请求。攒批的效果在高吞吐场景下非常明显。

模式提炼

Producer 的攒批机制是 write-ahead batching 模式的一个实例。这个模式在系统设计中反复出现:

  • TCP Nagle 算法:小包在发送缓冲区等待,积累到 MSS 或收到前一个包的 ACK 后才发出。
  • 数据库 group commit:多个事务的 WAL 写入合并成一次 fsync,减少磁盘 I/O。
  • gRPC 客户端:多个 RPC 请求合并到同一个 HTTP/2 连接的不同 stream 上。

模式的核心是:在生产者和传输层之间插入一个缓冲区,用可控的等待时间换取更少的传输次数。Kafka 的 RecordAccumulator 就是这个缓冲区,linger.ms 就是等待窗口。

工程迁移表

概念 Kafka Producer RocketMQ Producer 数据库写入路径
攒批缓冲 RecordAccumulator (batch.size + linger.ms) 异步发送时客户端攒批 WAL buffer / redo log buffer
分区选择 Partitioner (hash / sticky / custom) MessageQueueSelector 分库分表路由
确认级别 acks=0/1/all 同步/异步/oneway fsync 策略 (每事务 / 每秒 / 不 sync)
重试机制 retries + delivery.timeout.ms retryTimesWhenSendFailed 应用层重试
顺序保证 max.in.flight=1 或 idempotence orderly message + 单队列 单连接串行提交
异步回调 Callback / Future SendCallback CompletableFuture (异步 JDBC)

常见误解

误解一:acks=all 意味着集群所有 broker 都要确认。
事实:acks=all 只要求 ISR 中的副本确认。ISR 可能只有 leader 自己(如果其他副本落后太多被踢出 ISR)。需要配合 min.insync.replicas 才能保证多副本确认。

误解二:send() 是同步网络调用。
事实:send() 只是把消息追加到 RecordAccumulator 的内存 buffer,真正的网络 I/O 由 Sender 线程异步执行。send() 在 buffer 满时会阻塞等空间,但这是背压,不是网络等待。

误解三:linger.ms=0 性能最好。
事实:linger.ms=0 意味着每条消息都可能触发一次网络请求。在高吞吐场景下,适当增大 linger.ms(5-20ms)可以显著提高吞吐量,因为更大的 batch 意味着更少的网络往返和更好的压缩率。

练习

  1. 启动一个 3-broker 集群,创建一个 3 分区、3 副本的 topic。用上面的实验代码分别测试 acks=0、acks=1、acks=all 的延迟差异。然后停掉一个 broker,观察 acks=all + min.insync.replicas=2 时的行为变化。

  2. 修改实验代码,把 linger.ms 分别设为 0、5、50、200,发送 100000 条消息,记录总耗时和平均 batch 大小。画一张 linger.ms vs throughput 的曲线。

  3. 写一个自定义 Partitioner,把同一个 orderId 的消息路由到同一个分区。验证消费端能按 orderId 看到有序消息。

  4. 把 max.in.flight.requests.per.connection 设为 1 和 5,分别测试不开启幂等时的吞吐差异。然后开启 enable.idempotence=true,观察 max.in.flight=5 时是否仍能保证顺序。

系列导航

序号 主题
00 导读:为什么 Kafka 的核心是一根日志
01 架构总览:Broker、Controller 与元数据管理
02 日志存储:Segment、Index 与零拷贝
03 Producer 内部机制:攒批、分区与 acks
04 幂等 Producer 与序列号:消息不重不丢的第一层
05 Consumer Group 协议:分配、重平衡与静态成员
06 Offset 管理:提交、重置与消费语义
07 副本与 ISR:高可用的代价和折中
08 Controller 与 KRaft:从 ZooKeeper 到内置共识
09 Exactly-Once 与事务:跨 partition 的原子写入
10 日志压缩:把 topic 当 KV 表用
11 Kafka Streams:在日志之上构建流处理
12 Kafka Connect:标准化的数据管道
13 Schema Registry 与数据治理:给消息加上契约
14 性能模型:吞吐、延迟与调优思路
15 安全体系:认证、授权与加密
16 生产运维:集群扩缩、监控指标与故障排查
17 Kafka 与 RocketMQ:两种消息系统的设计选择

参考资料

  1. Apache Kafka 官方文档 — Producer Configs:https://kafka.apache.org/documentation/#producerconfigs
  2. KIP-480: Sticky Partitioning:https://cwiki.apache.org/confluence/display/KAFKA/KIP-480%3A+Sticky+Partitioner
  3. Kafka 源码 — RecordAccumulator.java:https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
  4. Neha Narkhede, Gwen Shapira, Todd Palino. Kafka: The Definitive Guide. O’Reilly, Chapter 3: Kafka Producers.
  5. KIP-91: Default acks=all:https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer