前面四篇走完了写入端。这一篇转向读取端的核心问题:一组 consumer 怎么瓜分一个 topic 的所有 partition。

这个问题经常被简化成"负载均衡",但 consumer group 协议解决的不是负载均衡——它解决的是 partition 所有权的分配与变更。分配发生在 group 成员变化时,而每次变更的代价远比想象中大。

本文只抓一个问题:consumer group 的分配协议是怎么运作的,重平衡为什么贵,怎么降低代价。

consumer group 的基本契约

Kafka consumer group 有一条硬约束:同一个 group 内,一个 partition 在任意时刻只能被一个 consumer 消费。

这条约束的直接后果:

  • group 内 consumer 数量超过 partition 数量时,多出来的 consumer 空转,分不到任何 partition。
  • partition 的消费进度由唯一的 consumer 负责推进,不存在两个 consumer 同时消费同一 partition 的情况。
  • consumer 数量变化时,partition 的所有权需要重新分配——这就是 rebalance。

这个"一对一"契约与 RocketMQ 的 consumer group 模型在集群模式下的语义一致:一个 queue 在同一消费组内只由一个 consumer 实例持有。差异在于分配协议的实现位置和触发方式。

1
2
3
4
5
6
7
8
9
10
11
Topic: orders (6 partitions)
Consumer Group: order-service

P-0 ──→ Consumer-0
P-1 ──→ Consumer-0
P-2 ──→ Consumer-1
P-3 ──→ Consumer-1
P-4 ──→ Consumer-2
P-5 ──→ Consumer-2

Consumer-3: idle (no partition assigned)

Group Coordinator:谁来管理分配

每个 consumer group 由集群中的一个特定 broker 管理,这个 broker 称为 Group Coordinator。

选定 Group Coordinator 的过程:

  1. group.id 做 hash,取模 __consumer_offsets topic 的 partition 数量(默认 50),得到目标 partition 编号。
  2. 该 partition 的 leader 所在的 broker 就是这个 group 的 Coordinator。

公式:coordinator = leader_of(__consumer_offsets[hash(group.id) % 50])

所有 group 成员与 Coordinator 之间维护心跳。Coordinator 负责:

  • 跟踪 group 的成员列表。
  • 在成员变化时触发 rebalance。
  • 接收并存储 offset 提交。

Coordinator 不负责计算分配方案——这个工作由 consumer 端的 leader 来做。

JoinGroup / SyncGroup:两阶段重平衡协议

当 group 成员发生变化(新 consumer 加入、旧 consumer 退出、consumer 崩溃超时),Coordinator 触发 rebalance。整个过程分两个阶段。

阶段一:JoinGroup

所有 consumer 向 Coordinator 发送 JoinGroup 请求。请求中包含:

  • consumer 支持的分配策略列表(如 range、roundrobin、sticky)。
  • consumer 订阅的 topic 列表。

Coordinator 收齐所有成员的 JoinGroup 请求后:

  • 选出一个 consumer 作为 group leader(通常是第一个发送 JoinGroup 的成员)。
  • 把完整的成员列表和订阅信息发给 leader。
  • 其他成员只收到一个空响应,知道自己不是 leader。

阶段二:SyncGroup

leader consumer 根据选定的分配策略,计算出每个 consumer 应该消费哪些 partition,然后把分配结果通过 SyncGroup 请求发给 Coordinator。

Coordinator 把每个 consumer 的分配结果分发给对应的 consumer。每个 consumer 收到自己的分配结果后,开始消费分到的 partition。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Consumer-0 ──JoinGroup──→ Coordinator
Consumer-1 ──JoinGroup──→ Coordinator
Consumer-2 ──JoinGroup──→ Coordinator


选 Consumer-0 为 leader
把成员列表发给 leader


Consumer-0: 计算分配方案
Consumer-0 ──SyncGroup(分配结果)──→ Coordinator
Consumer-1 ──SyncGroup(空)────────→ Coordinator
Consumer-2 ──SyncGroup(空)────────→ Coordinator


Coordinator 分发结果
Consumer-0 ← {P-0, P-1}
Consumer-1 ← {P-2, P-3}
Consumer-2 ← {P-4, P-5}

这个设计的关键点:分配逻辑在客户端执行,broker 只做协调和分发。这与 HDFS NameNode 直接决定 block 到 DataNode 的映射不同——Kafka 选择了"中心协调、客户端计算"的模式。

四种分配策略

leader consumer 计算分配方案时,使用 partition.assignment.strategy 配置指定的策略。Kafka 内置四种。

RangeAssignor

按 topic 维度分配。对每个 topic,把 partition 按编号排序,consumer 按字典序排序,然后均分。

6 个 partition、3 个 consumer 的情况:每个 consumer 分到 2 个 partition。如果 partition 数不能整除 consumer 数,排在前面的 consumer 多分一个。

问题在于多 topic 场景。如果 group 订阅了 3 个 topic,每个 topic 有 3 个 partition、2 个 consumer,那么每个 topic 中 Consumer-0 分到 2 个 partition、Consumer-1 分到 1 个——累计下来 Consumer-0 承担 6 个 partition,Consumer-1 只有 3 个。

RoundRobinAssignor

把所有 topic 的所有 partition 放到一个列表里,按轮询方式分配。解决了 RangeAssignor 在多 topic 场景下的不均匀问题。

但它不考虑每个 consumer 的订阅差异——如果 group 内的 consumer 订阅了不同的 topic 子集,RoundRobin 可能把某个 partition 分给一个没订阅该 topic 的 consumer(实际实现中会跳过,但跳过后轮次不回填,仍然可能不均匀)。

StickyAssignor

在 RoundRobin 的基础上增加一个约束:尽量保持上一次的分配结果不变。

触发 rebalance 时,StickyAssignor 先看哪些 partition-consumer 映射可以保留,只重新分配需要变更的部分。这减少了 rebalance 时 consumer 需要重建本地状态的代价。

CooperativeStickyAssignor

自 Kafka 2.4 起引入(KIP-429)。前面三种策略都使用 eager 协议——rebalance 开始时所有 consumer 放弃所有 partition,等分配完成后重新领取。CooperativeStickyAssignor 使用 incremental cooperative 协议,只收回需要转移的 partition,其他 partition 继续消费。

这是降低 rebalance 代价的关键改进,下一节展开。

Eager rebalance 的代价

在 eager 协议下,rebalance 的流程:

  1. Coordinator 通知所有 consumer:rebalance 开始。
  2. 所有 consumer 停止消费,提交当前 offset,放弃所有 partition。
  3. 所有 consumer 发送 JoinGroup。
  4. leader 计算新分配。
  5. 通过 SyncGroup 分发。
  6. consumer 领取新 partition,从 committed offset 开始消费。

“所有 consumer 停止消费”——这就是 stop-the-world。在 rebalance 窗口内,整个 group 的消费吞吐降为零。

对于 partition 数多、consumer 数多的 group,rebalance 窗口可能持续数十秒。这段时间的消费停滞直接反映为 consumer lag 的跳升。

更糟糕的是 rebalance 风暴:一个 consumer 的短暂 GC 暂停导致心跳超时,触发 rebalance;rebalance 期间其他 consumer 的处理线程也被阻塞,如果此时它们的 session 也超时,就会触发连锁 rebalance。

Cooperative rebalance:增量协议

KIP-429 引入了 cooperative rebalance(也叫 incremental rebalance),核心思路:不再要求所有 consumer 在 rebalance 开始时放弃所有 partition。

cooperative 协议的流程:

  1. Coordinator 触发 rebalance。
  2. 所有 consumer 发送 JoinGroup,但在请求中带上当前持有的 partition 列表。
  3. leader 计算新分配,与当前分配做 diff,找出需要转移的 partition。
  4. 第一轮 SyncGroup:leader 只通知需要释放 partition 的 consumer “交出这几个 partition”,其他 consumer 继续消费不中断。
  5. 被通知的 consumer 释放指定 partition 后,触发第二轮 rebalance。
  6. 第二轮 rebalance 中,释放出来的 partition 被分配给目标 consumer。
1
2
3
4
5
6
7
8
9
Eager(stop-the-world):
所有 consumer: ████████░░░░░░████████
消费中 全部停止 消费中

Cooperative(增量):
Consumer-0: ████████████████████████ (不受影响)
Consumer-1: ████████████████████████ (不受影响)
Consumer-2: ████████░░██████████████ (只停 P-5)
Consumer-3: ░░░░░░░░░░██████████████ (新加入,接收 P-5)

cooperative 协议需要两轮 rebalance 才能完成一次转移,但每轮中大部分 consumer 不受影响。总的不可用时间从"全体停止 N 秒"缩减为"个别 consumer 停止个别 partition 若干秒"。

使用 cooperative 协议需要把 partition.assignment.strategy 设为 CooperativeStickyAssignor

1
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor

从 eager 迁移到 cooperative 需要滚动升级:不能在同一个 group 中混用 eager 和 cooperative 协议的 consumer。官方建议的迁移步骤是先配置两种策略共存,滚动重启让所有 consumer 都支持 cooperative,然后再移除 eager 策略。

静态成员:避免不必要的 rebalance

在容器化部署中,滚动重启是常规操作。每次重启一个 consumer 实例,都会触发两次 rebalance(一次离开、一次加入)。如果有 10 个实例要滚动重启,就是 20 次 rebalance。

KIP-345 引入了 static group membership,通过 group.instance.id 配置给每个 consumer 一个持久化标识。

1
2
group.instance.id=order-consumer-pod-2
session.timeout.ms=30000

设置 group.instance.id 后的行为变化:

  • consumer 关闭时不再主动发送 LeaveGroup 请求。
  • Coordinator 不会因为 consumer 断开连接立即触发 rebalance,而是等待 session.timeout.ms 超时。
  • 如果 consumer 在超时前用相同的 group.instance.id 重新加入,Coordinator 直接把原来的 partition 分配还给它,不触发 rebalance。

这意味着在 session.timeout.ms 窗口内完成滚动重启,整个过程零 rebalance。代价是需要把 session.timeout.ms 设得足够长(比如 30 秒到几分钟),在 consumer 真的崩溃时,检测延迟相应增加。

实验:观察 rebalance

启动一个 3 partition 的 topic 和 3 个 consumer,然后关闭一个 consumer,观察 rebalance 日志。

准备环境

1
2
3
4
# 创建 topic
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic rebalance-test \
--partitions 3 --replication-factor 1

启动三个 consumer

1
2
3
4
5
# 终端 1/2/3 分别启动
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic rebalance-test \
--group rebalance-demo \
--property print.partition=true

观察分配

1
2
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group rebalance-demo

输出类似:

1
2
3
4
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
rebalance-demo rebalance-test 0 0 0 0
rebalance-demo rebalance-test 1 0 0 0
rebalance-demo rebalance-test 2 0 0 0

每个 partition 分配给不同的 consumer。

关闭一个 consumer

Ctrl-C 关闭终端 3 的 consumer。观察剩余两个 consumer 的日志,会看到类似输出:

1
2
3
4
[Consumer clientId=..., groupId=rebalance-demo] Revoke previously assigned partitions
[Consumer clientId=..., groupId=rebalance-demo] (Re-)joining group
[Consumer clientId=..., groupId=rebalance-demo] Successfully joined group with generation N
[Consumer clientId=..., groupId=rebalance-demo] Notifying assignor about new assignment

再次运行 kafka-consumer-groups.sh --describe,可以看到 3 个 partition 被重新分配给 2 个 consumer。

对比 eager 与 cooperative

在 eager 模式下,rebalance 日志中会出现 “Revoke previously assigned partitions”——所有 consumer 先放弃所有 partition,再重新分配。

切换为 CooperativeStickyAssignor 后,日志中只对需要转移的 partition 出现 revoke,其余 partition 保持不变。通过对比 rebalance 前后 kafka-consumer-groups.sh 输出中每个 consumer 持有的 partition 列表,可以确认 cooperative 模式下大部分分配未变。

模式提炼

consumer group 的分配协议体现了一种"中心协调、客户端计算"的模式:

  • Coordinator(broker 端)负责成员管理、触发时机、结果分发。
  • leader consumer(客户端)负责计算分配方案。
  • 其他 consumer 被动接收分配结果。

这种模式的优势:分配策略可以在客户端升级而不需要升级 broker。Kafka 的四种内置策略以及用户自定义策略都是纯客户端实现。

对比其他系统:

  • HDFS NameNode 直接计算 block 到 DataNode 的映射——中心计算、中心分发。
  • RocketMQ 的 AllocateMessageQueueStrategy 也是在客户端执行,但每个 consumer 独立计算自己的分配(没有 leader 角色),依赖所有 consumer 看到相同的成员列表和 queue 列表来保证计算结果一致。

工程迁移表

概念 Kafka Consumer Group RocketMQ Consumer Group 数据库连接池
分配单元 partition message queue connection slot
分配触发 成员变化触发 rebalance 定时触发(20s 周期) 按需分配
分配计算位置 leader consumer 每个 consumer 独立计算 连接池管理器
分配协调者 Group Coordinator Name Server(无状态) 连接池自身
增量分配 CooperativeStickyAssignor 无内置增量协议 N/A
成员标识持久化 group.instance.id instance name N/A
分配策略扩展 客户端 SPI 客户端 SPI 固定策略

常见误解

“rebalance 就是负载均衡”——rebalance 是 partition 所有权的重新分配,不是按负载动态调整。即使某个 consumer 的 CPU 使用率已经打满,只要 partition 数和 consumer 数不变,就不会触发 rebalance。

“consumer 数量越多吞吐越高”——consumer 数量超过 partition 数量后,多余的 consumer 空转。要提高消费并行度,需要同时增加 partition 数量和 consumer 数量。

“cooperative rebalance 不会中断消费”——cooperative 协议仍然会中断需要转移的 partition 的消费。它的改进是:不需要转移的 partition 不受影响。

练习

  1. 创建一个 6 partition 的 topic,启动 4 个 consumer,用 kafka-consumer-groups.sh 观察分配结果。然后关闭一个 consumer,再观察新的分配。记录每次 rebalance 耗时(从日志中读取 JoinGroup 到 SyncGroup 完成的时间差)。

  2. 配置 group.instance.id,启动 3 个 consumer,关闭其中一个并在 session.timeout.ms 内用相同的 group.instance.id 重启。验证 partition 分配没有变化。

  3. 写一个 Java consumer 程序,实现 ConsumerRebalanceListener,在 onPartitionsRevokedonPartitionsAssigned 回调中打印日志。分别使用 RangeAssignor 和 CooperativeStickyAssignor,对比 rebalance 时哪些 partition 被 revoke、哪些被 assign。

系列导航

序号 主题
00 导读:为什么 Kafka 的核心是一根日志
01 架构总览:Broker、Controller 与元数据管理
02 日志存储:Segment、Index 与零拷贝
03 Producer 内部机制:攒批、分区与 acks
04 幂等 Producer 与序列号:消息不重不丢的第一层
05 Consumer Group 协议:分配、重平衡与静态成员
06 Offset 管理:提交、重置与消费语义
07 副本与 ISR:高可用的代价和折中
08 Controller 与 KRaft:从 ZooKeeper 到内置共识
09 Exactly-Once 与事务:跨 partition 的原子写入
10 日志压缩:把 topic 当 KV 表用
11 Kafka Streams:在日志之上构建流处理
12 Kafka Connect:标准化的数据管道
13 Schema Registry 与数据治理:给消息加上契约
14 性能模型:吞吐、延迟与调优思路
15 安全体系:认证、授权与加密
16 生产运维:集群扩缩、监控指标与故障排查
17 Kafka 与 RocketMQ:两种消息系统的设计选择

参考资料