Consumer Group 协议:分配、重平衡与静态成员
前面四篇走完了写入端。这一篇转向读取端的核心问题:一组 consumer 怎么瓜分一个 topic 的所有 partition。
这个问题经常被简化成"负载均衡",但 consumer group 协议解决的不是负载均衡——它解决的是 partition 所有权的分配与变更。分配发生在 group 成员变化时,而每次变更的代价远比想象中大。
本文只抓一个问题:consumer group 的分配协议是怎么运作的,重平衡为什么贵,怎么降低代价。
consumer group 的基本契约
Kafka consumer group 有一条硬约束:同一个 group 内,一个 partition 在任意时刻只能被一个 consumer 消费。
这条约束的直接后果:
- group 内 consumer 数量超过 partition 数量时,多出来的 consumer 空转,分不到任何 partition。
- partition 的消费进度由唯一的 consumer 负责推进,不存在两个 consumer 同时消费同一 partition 的情况。
- consumer 数量变化时,partition 的所有权需要重新分配——这就是 rebalance。
这个"一对一"契约与 RocketMQ 的 consumer group 模型在集群模式下的语义一致:一个 queue 在同一消费组内只由一个 consumer 实例持有。差异在于分配协议的实现位置和触发方式。
1 | |
Group Coordinator:谁来管理分配
每个 consumer group 由集群中的一个特定 broker 管理,这个 broker 称为 Group Coordinator。
选定 Group Coordinator 的过程:
- 对 group.id 做 hash,取模
__consumer_offsetstopic 的 partition 数量(默认 50),得到目标 partition 编号。 - 该 partition 的 leader 所在的 broker 就是这个 group 的 Coordinator。
公式:coordinator = leader_of(__consumer_offsets[hash(group.id) % 50])
所有 group 成员与 Coordinator 之间维护心跳。Coordinator 负责:
- 跟踪 group 的成员列表。
- 在成员变化时触发 rebalance。
- 接收并存储 offset 提交。
Coordinator 不负责计算分配方案——这个工作由 consumer 端的 leader 来做。
JoinGroup / SyncGroup:两阶段重平衡协议
当 group 成员发生变化(新 consumer 加入、旧 consumer 退出、consumer 崩溃超时),Coordinator 触发 rebalance。整个过程分两个阶段。
阶段一:JoinGroup
所有 consumer 向 Coordinator 发送 JoinGroup 请求。请求中包含:
- consumer 支持的分配策略列表(如 range、roundrobin、sticky)。
- consumer 订阅的 topic 列表。
Coordinator 收齐所有成员的 JoinGroup 请求后:
- 选出一个 consumer 作为 group leader(通常是第一个发送 JoinGroup 的成员)。
- 把完整的成员列表和订阅信息发给 leader。
- 其他成员只收到一个空响应,知道自己不是 leader。
阶段二:SyncGroup
leader consumer 根据选定的分配策略,计算出每个 consumer 应该消费哪些 partition,然后把分配结果通过 SyncGroup 请求发给 Coordinator。
Coordinator 把每个 consumer 的分配结果分发给对应的 consumer。每个 consumer 收到自己的分配结果后,开始消费分到的 partition。
1 | |
这个设计的关键点:分配逻辑在客户端执行,broker 只做协调和分发。这与 HDFS NameNode 直接决定 block 到 DataNode 的映射不同——Kafka 选择了"中心协调、客户端计算"的模式。
四种分配策略
leader consumer 计算分配方案时,使用 partition.assignment.strategy 配置指定的策略。Kafka 内置四种。
RangeAssignor
按 topic 维度分配。对每个 topic,把 partition 按编号排序,consumer 按字典序排序,然后均分。
6 个 partition、3 个 consumer 的情况:每个 consumer 分到 2 个 partition。如果 partition 数不能整除 consumer 数,排在前面的 consumer 多分一个。
问题在于多 topic 场景。如果 group 订阅了 3 个 topic,每个 topic 有 3 个 partition、2 个 consumer,那么每个 topic 中 Consumer-0 分到 2 个 partition、Consumer-1 分到 1 个——累计下来 Consumer-0 承担 6 个 partition,Consumer-1 只有 3 个。
RoundRobinAssignor
把所有 topic 的所有 partition 放到一个列表里,按轮询方式分配。解决了 RangeAssignor 在多 topic 场景下的不均匀问题。
但它不考虑每个 consumer 的订阅差异——如果 group 内的 consumer 订阅了不同的 topic 子集,RoundRobin 可能把某个 partition 分给一个没订阅该 topic 的 consumer(实际实现中会跳过,但跳过后轮次不回填,仍然可能不均匀)。
StickyAssignor
在 RoundRobin 的基础上增加一个约束:尽量保持上一次的分配结果不变。
触发 rebalance 时,StickyAssignor 先看哪些 partition-consumer 映射可以保留,只重新分配需要变更的部分。这减少了 rebalance 时 consumer 需要重建本地状态的代价。
CooperativeStickyAssignor
自 Kafka 2.4 起引入(KIP-429)。前面三种策略都使用 eager 协议——rebalance 开始时所有 consumer 放弃所有 partition,等分配完成后重新领取。CooperativeStickyAssignor 使用 incremental cooperative 协议,只收回需要转移的 partition,其他 partition 继续消费。
这是降低 rebalance 代价的关键改进,下一节展开。
Eager rebalance 的代价
在 eager 协议下,rebalance 的流程:
- Coordinator 通知所有 consumer:rebalance 开始。
- 所有 consumer 停止消费,提交当前 offset,放弃所有 partition。
- 所有 consumer 发送 JoinGroup。
- leader 计算新分配。
- 通过 SyncGroup 分发。
- consumer 领取新 partition,从 committed offset 开始消费。
“所有 consumer 停止消费”——这就是 stop-the-world。在 rebalance 窗口内,整个 group 的消费吞吐降为零。
对于 partition 数多、consumer 数多的 group,rebalance 窗口可能持续数十秒。这段时间的消费停滞直接反映为 consumer lag 的跳升。
更糟糕的是 rebalance 风暴:一个 consumer 的短暂 GC 暂停导致心跳超时,触发 rebalance;rebalance 期间其他 consumer 的处理线程也被阻塞,如果此时它们的 session 也超时,就会触发连锁 rebalance。
Cooperative rebalance:增量协议
KIP-429 引入了 cooperative rebalance(也叫 incremental rebalance),核心思路:不再要求所有 consumer 在 rebalance 开始时放弃所有 partition。
cooperative 协议的流程:
- Coordinator 触发 rebalance。
- 所有 consumer 发送 JoinGroup,但在请求中带上当前持有的 partition 列表。
- leader 计算新分配,与当前分配做 diff,找出需要转移的 partition。
- 第一轮 SyncGroup:leader 只通知需要释放 partition 的 consumer “交出这几个 partition”,其他 consumer 继续消费不中断。
- 被通知的 consumer 释放指定 partition 后,触发第二轮 rebalance。
- 第二轮 rebalance 中,释放出来的 partition 被分配给目标 consumer。
1 | |
cooperative 协议需要两轮 rebalance 才能完成一次转移,但每轮中大部分 consumer 不受影响。总的不可用时间从"全体停止 N 秒"缩减为"个别 consumer 停止个别 partition 若干秒"。
使用 cooperative 协议需要把 partition.assignment.strategy 设为 CooperativeStickyAssignor:
1 | |
从 eager 迁移到 cooperative 需要滚动升级:不能在同一个 group 中混用 eager 和 cooperative 协议的 consumer。官方建议的迁移步骤是先配置两种策略共存,滚动重启让所有 consumer 都支持 cooperative,然后再移除 eager 策略。
静态成员:避免不必要的 rebalance
在容器化部署中,滚动重启是常规操作。每次重启一个 consumer 实例,都会触发两次 rebalance(一次离开、一次加入)。如果有 10 个实例要滚动重启,就是 20 次 rebalance。
KIP-345 引入了 static group membership,通过 group.instance.id 配置给每个 consumer 一个持久化标识。
1 | |
设置 group.instance.id 后的行为变化:
- consumer 关闭时不再主动发送 LeaveGroup 请求。
- Coordinator 不会因为 consumer 断开连接立即触发 rebalance,而是等待
session.timeout.ms超时。 - 如果 consumer 在超时前用相同的
group.instance.id重新加入,Coordinator 直接把原来的 partition 分配还给它,不触发 rebalance。
这意味着在 session.timeout.ms 窗口内完成滚动重启,整个过程零 rebalance。代价是需要把 session.timeout.ms 设得足够长(比如 30 秒到几分钟),在 consumer 真的崩溃时,检测延迟相应增加。
实验:观察 rebalance
启动一个 3 partition 的 topic 和 3 个 consumer,然后关闭一个 consumer,观察 rebalance 日志。
准备环境
1 | |
启动三个 consumer
1 | |
观察分配
1 | |
输出类似:
1 | |
每个 partition 分配给不同的 consumer。
关闭一个 consumer
Ctrl-C 关闭终端 3 的 consumer。观察剩余两个 consumer 的日志,会看到类似输出:
1 | |
再次运行 kafka-consumer-groups.sh --describe,可以看到 3 个 partition 被重新分配给 2 个 consumer。
对比 eager 与 cooperative
在 eager 模式下,rebalance 日志中会出现 “Revoke previously assigned partitions”——所有 consumer 先放弃所有 partition,再重新分配。
切换为 CooperativeStickyAssignor 后,日志中只对需要转移的 partition 出现 revoke,其余 partition 保持不变。通过对比 rebalance 前后 kafka-consumer-groups.sh 输出中每个 consumer 持有的 partition 列表,可以确认 cooperative 模式下大部分分配未变。
模式提炼
consumer group 的分配协议体现了一种"中心协调、客户端计算"的模式:
- Coordinator(broker 端)负责成员管理、触发时机、结果分发。
- leader consumer(客户端)负责计算分配方案。
- 其他 consumer 被动接收分配结果。
这种模式的优势:分配策略可以在客户端升级而不需要升级 broker。Kafka 的四种内置策略以及用户自定义策略都是纯客户端实现。
对比其他系统:
- HDFS NameNode 直接计算 block 到 DataNode 的映射——中心计算、中心分发。
- RocketMQ 的 AllocateMessageQueueStrategy 也是在客户端执行,但每个 consumer 独立计算自己的分配(没有 leader 角色),依赖所有 consumer 看到相同的成员列表和 queue 列表来保证计算结果一致。
工程迁移表
| 概念 | Kafka Consumer Group | RocketMQ Consumer Group | 数据库连接池 |
|---|---|---|---|
| 分配单元 | partition | message queue | connection slot |
| 分配触发 | 成员变化触发 rebalance | 定时触发(20s 周期) | 按需分配 |
| 分配计算位置 | leader consumer | 每个 consumer 独立计算 | 连接池管理器 |
| 分配协调者 | Group Coordinator | Name Server(无状态) | 连接池自身 |
| 增量分配 | CooperativeStickyAssignor | 无内置增量协议 | N/A |
| 成员标识持久化 | group.instance.id | instance name | N/A |
| 分配策略扩展 | 客户端 SPI | 客户端 SPI | 固定策略 |
常见误解
“rebalance 就是负载均衡”——rebalance 是 partition 所有权的重新分配,不是按负载动态调整。即使某个 consumer 的 CPU 使用率已经打满,只要 partition 数和 consumer 数不变,就不会触发 rebalance。
“consumer 数量越多吞吐越高”——consumer 数量超过 partition 数量后,多余的 consumer 空转。要提高消费并行度,需要同时增加 partition 数量和 consumer 数量。
“cooperative rebalance 不会中断消费”——cooperative 协议仍然会中断需要转移的 partition 的消费。它的改进是:不需要转移的 partition 不受影响。
练习
-
创建一个 6 partition 的 topic,启动 4 个 consumer,用
kafka-consumer-groups.sh观察分配结果。然后关闭一个 consumer,再观察新的分配。记录每次 rebalance 耗时(从日志中读取 JoinGroup 到 SyncGroup 完成的时间差)。 -
配置
group.instance.id,启动 3 个 consumer,关闭其中一个并在session.timeout.ms内用相同的group.instance.id重启。验证 partition 分配没有变化。 -
写一个 Java consumer 程序,实现
ConsumerRebalanceListener,在onPartitionsRevoked和onPartitionsAssigned回调中打印日志。分别使用 RangeAssignor 和 CooperativeStickyAssignor,对比 rebalance 时哪些 partition 被 revoke、哪些被 assign。
系列导航
| 序号 | 主题 |
|---|---|
| 00 | 导读:为什么 Kafka 的核心是一根日志 |
| 01 | 架构总览:Broker、Controller 与元数据管理 |
| 02 | 日志存储:Segment、Index 与零拷贝 |
| 03 | Producer 内部机制:攒批、分区与 acks |
| 04 | 幂等 Producer 与序列号:消息不重不丢的第一层 |
| 05 | Consumer Group 协议:分配、重平衡与静态成员 |
| 06 | Offset 管理:提交、重置与消费语义 |
| 07 | 副本与 ISR:高可用的代价和折中 |
| 08 | Controller 与 KRaft:从 ZooKeeper 到内置共识 |
| 09 | Exactly-Once 与事务:跨 partition 的原子写入 |
| 10 | 日志压缩:把 topic 当 KV 表用 |
| 11 | Kafka Streams:在日志之上构建流处理 |
| 12 | Kafka Connect:标准化的数据管道 |
| 13 | Schema Registry 与数据治理:给消息加上契约 |
| 14 | 性能模型:吞吐、延迟与调优思路 |
| 15 | 安全体系:认证、授权与加密 |
| 16 | 生产运维:集群扩缩、监控指标与故障排查 |
| 17 | Kafka 与 RocketMQ:两种消息系统的设计选择 |
参考资料
- Apache Kafka 官方文档 — Consumer Configuration: https://kafka.apache.org/documentation/#consumerconfigs
- KIP-429: Kafka Consumer Incremental Rebalance Protocol: https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol
- KIP-345: Introduce static membership protocol to reduce consumer rebalances: https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances
- Confluent — Apache Kafka Consumer Group Protocol: https://docs.confluent.io/platform/current/clients/consumer.html
- Neha Narkhede, Gwen Shapira, Todd Palino. Kafka: The Definitive Guide. O’Reilly. Chapter 4: Kafka Consumers.
