前面的文章中,日志的保留策略一直是基于时间或大小的删除。这一篇介绍另一种保留策略:日志压缩。

日志压缩容易被理解成"压缩存储空间"。更准确的说法是:日志压缩是一种按 key 去重的保留策略,对每个 key 只保留最新的 value,把一个 append-only 的日志变成一个可以反映最新状态的 KV 快照。

本文只抓一个问题:日志压缩的语义、执行机制和适用场景。

两种保留策略

Kafka 的日志保留由 topic 级别的 cleanup.policy 参数控制,有三种取值:

1
2
3
cleanup.policy=delete     基于时间或大小删除整个 segment
cleanup.policy=compact 按 key 去重,保留每个 key 的最新 value
cleanup.policy=compact,delete 先压缩去重,再按时间/大小删除

delete 策略是前面几篇文章一直在讨论的默认行为:segment 文件超过 retention.ms 或 retention.bytes 后被整体删除,不关心消息的 key 是什么。

compact 策略的语义完全不同:它不按时间或大小删除,而是保证对于日志中出现过的每个 key,至少保留该 key 的最新一条记录。旧版本的记录在压缩过程中被清除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
压缩前:
offset key value
0 K1 V1
1 K2 V1
2 K1 V2 ← K1 的更新
3 K3 V1
4 K2 V2 ← K2 的更新
5 K1 V3 ← K1 的再次更新
6 K3 null ← K3 的删除标记 (tombstone)

压缩后:
offset key value
4 K2 V2 ← K2 的最新值
5 K1 V3 ← K1 的最新值
6 K3 null ← tombstone (暂时保留)

压缩后,offset 0、1、2、3 的记录被清除,因为同一个 key 有更新的版本。注意 offset 编号不变——压缩不会重新编号,只是跳过被清除的 offset。Consumer 从 offset 0 开始消费时,会直接跳到 offset 4。

压缩的执行机制

日志压缩由 broker 内部的 Log Cleaner 线程执行。Cleaner 线程的数量由 log.cleaner.threads 参数控制(默认 1)。

每个 partition 的日志在压缩视角下被分为两部分:

1
2
3
4
5
     tail (已压缩)          |  head (未压缩, dirty)
|
[K1:V2][K2:V1][K3:V1] | [K1:V3][K2:V2][K3:null][K4:V1]
|
cleaner checkpoint

tail 是已经经过压缩的部分,每个 key 只出现一次。head 是尚未压缩的部分(dirty segment),可能包含重复 key。cleaner checkpoint 是两部分的分界点,持久化在 cleaner-offset-checkpoint 文件中。

Cleaner 线程的工作流程:

  1. 选择压缩目标。Cleaner 从所有 partition 中选择 dirty ratio 最高的 partition 进行压缩。Dirty ratio = dirty 部分大小 / 总日志大小。只有 dirty ratio 超过 min.cleanable.dirty.ratio(默认 0.5)时才会触发压缩。
  2. 构建 offset map。Cleaner 扫描 dirty 部分,在内存中构建一个 key → latest offset 的映射表。这个映射表的内存大小由 log.cleaner.dedupe.buffer.size 控制(默认 128MB)。
  3. 拷贝存活记录。Cleaner 遍历 tail 部分的每条记录,用 offset map 判断该记录是否是对应 key 的最新版本。如果不是,跳过;如果是,拷贝到新的 segment 文件中。
  4. 交换文件。新 segment 替换旧 segment,更新 cleaner checkpoint。

这个过程不会阻塞读写。Producer 继续向 head 追加写入,consumer 继续读取。Cleaner 操作的是已经关闭(不再追加)的 segment 文件。

Tombstone:删除语义

在 compact 策略下,删除一个 key 的方式是写入一条 value 为 null 的记录,这条记录被称为 tombstone(墓碑标记)。

Tombstone 不能被立刻清除。原因是:如果一个 consumer 在压缩之前断线,恢复后从旧 offset 重新消费,它需要看到 tombstone 才知道这个 key 已被删除。如果 tombstone 被过早清除,consumer 会认为该 key 的值仍然是上一次压缩后保留的旧值。

delete.retention.ms(默认 24 小时)控制 tombstone 的保留时间。Tombstone 在写入超过 delete.retention.ms 之后才会在下一次压缩中被真正清除。

压缩时机控制

两个参数控制压缩的时机:

min.compaction.lag.ms:消息写入后的最短等待时间,在此时间内的消息不会被压缩。用于保证 consumer 有足够的时间窗口读取到消息的中间状态。默认值为 0,即不做额外等待。

max.compaction.lag.ms:消息写入后的最长等待时间。如果一条消息超过这个时间还未被压缩,Cleaner 会强制触发压缩,即使 dirty ratio 未达到阈值。默认值为 Long.MAX_VALUE(实质上不启用)。自 Kafka 2.3 起引入(KIP-354),用于满足 GDPR 等合规场景下的数据删除时效要求。

__consumer_offsets:压缩的典型应用

Kafka 内部的 __consumer_offsets topic 就是一个 compact 策略的 topic。这个 topic 存储所有 consumer group 的 offset 提交记录。

每条 offset 提交记录的 key 是 (groupId, topic, partition) 三元组,value 是 offset 值和元数据。当同一个 consumer group 反复提交同一个 partition 的 offset 时,__consumer_offsets 中会积累大量旧记录。日志压缩保证只保留每个 (group, topic, partition) 组合的最新 offset,避免 topic 无限增长。

这也是为什么 consumer group 删除后需要写入 tombstone:group coordinator 为被删除的 group 的每个 (group, topic, partition) 写入 value=null 的记录,压缩后这些 key 被清除,consumer group 彻底从 __consumer_offsets 中消失。

其他使用场景

Changelog topic(Kafka Streams 的状态存储)。Kafka Streams 应用的每个 state store 都有一个对应的 changelog topic,用于持久化状态变更。State store 的每次 put 操作都会向 changelog topic 写入一条 (key, value) 记录,delete 操作写入 tombstone。日志压缩保证 changelog topic 是 state store 的完整快照——新实例或故障恢复的实例可以从 changelog topic 的头部消费,重建完整的 state store。

CDC 快照。Change Data Capture 场景中,数据库的每一行变更被发送到 Kafka,key 是主键,value 是行数据。日志压缩使 topic 成为数据库表的镜像:任意时刻从头消费,得到的是每一行的最新状态。

配置分发。把系统配置的每个 key 写入 compact topic,consumer 启动时从头消费获得完整配置,运行时实时接收配置变更。

实验:观察日志压缩

创建一个 compact 策略的 topic,验证压缩行为:

1
2
3
4
5
6
7
8
# 创建 compacted topic,设置较短的 segment 和压缩参数以便快速观察
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic compact-test \
--partitions 1 --replication-factor 1 \
--config cleanup.policy=compact \
--config segment.ms=60000 \
--config min.cleanable.dirty.ratio=0.1 \
--config delete.retention.ms=60000

写入多条同 key 的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 用 : 作为 key separator
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic compact-test \
--property parse.key=true \
--property key.separator=:

# 输入以下内容
user1:{"name":"Alice","age":25}
user2:{"name":"Bob","age":30}
user1:{"name":"Alice","age":26}
user3:{"name":"Charlie","age":35}
user2:{"name":"Bob","age":31}
user1:{"name":"Alice","age":27}

等待 segment 滚动和 cleaner 执行(可能需要 1-2 分钟),然后从头消费:

1
2
3
4
5
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic compact-test \
--from-beginning \
--property print.key=true \
--property key.separator=:

预期输出中,user1 只出现一次且值为 age=27,user2 只出现一次且值为 age=31,user3 出现一次且值为 age=35。

验证 tombstone 行为:

1
2
3
4
5
6
7
# 发送 tombstone(空 value)
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic compact-test \
--property parse.key=true \
--property key.separator=:

user3:

注意 user3: 后面没有值,这会被序列化为 null value(tombstone)。等待压缩后再次从头消费,user3 会消失(如果 tombstone 保留期已过)或仍然以空值出现(如果在 delete.retention.ms 内)。

实验结果映射

这个实验展示了日志压缩的几个特性:

  1. 压缩不是即时的。消息写入后不会立刻触发压缩。Cleaner 线程按 dirty ratio 和时间间隔周期性执行。在 active segment(当前正在写入的 segment)关闭之前,其中的消息不会被压缩。
  2. Offset 不连续。压缩后的日志中 offset 存在间隔。Consumer 调用 poll() 时,broker 返回的是下一个存在的 offset 的消息,跳过被清除的 offset。
  3. Tombstone 有保留期。Tombstone 不会在第一次压缩时就被清除。Consumer 需要足够的时间窗口来看到删除标记。

模式提炼

日志压缩是一种基于 key 的日志结构化合并(log-structured merge with key-based retention)。

这种模式在存储系统中反复出现。LSM-tree(Log-Structured Merge Tree)的 compaction 与 Kafka 的 log compaction 有相似的结构:数据先写入内存(对应 Kafka 的 active segment),定期刷到不可变文件(对应 Kafka 的 closed segment),后台合并线程清理过期版本(对应 Kafka 的 cleaner)。

区别在于目的。LSM-tree 的 compaction 是为了合并多层 SSTable 以加速点查询和范围查询。Kafka 的 log compaction 不是为了加速查询(Kafka 本身不支持按 key 点查),而是为了保证从日志头部消费时能获得每个 key 的最新状态快照。

Git 的 object packing(git gc)是另一个类似模式:Git 定期把松散的 object 文件合并成 packfile,清理已经被新 commit 覆盖的旧 object。与 Kafka log compaction 的共性是"按引用关系保留最新版本"。

工程迁移表

概念 Kafka Log Compaction RocksDB Compaction 数据库 MVCC Cleanup Git gc
触发条件 dirty ratio 超过阈值 level 大小超过阈值 事务可见性窗口外 松散对象超过阈值
保留语义 每个 key 保留最新 value 每个 key 保留可见版本 每个行保留当前版本 每个对象保留可达版本
删除标记 tombstone (null value) tombstone marker 删除标记 + undo log 引用不可达即回收
删除延迟 delete.retention.ms 无(snapshot 释放后即清) purge_thread 周期执行 gc.pruneExpire
空间回收时机 cleaner 下一轮执行 compaction 合并后 purge 执行后 repack 执行后
对读取的影响 不阻塞,consumer 跳过空洞 可能短暂增加读放大 不阻塞正常查询 不影响 git 命令

常见误解

压缩是即时发生的。压缩由后台 Cleaner 线程执行,受 dirty ratio、segment 滚动和 cleaner 调度周期影响。从消息写入到被压缩,延迟可能是分钟级甚至小时级。Active segment 中的消息在 segment 关闭之前不会被压缩。如果需要严格的压缩时效,可以通过 max.compaction.lag.ms 设置上限。

压缩后的 topic 没有 offset 间隔。压缩后的日志中 offset 存在间隔。被清除的旧版本记录的 offset 空出来,不会被重新分配。Consumer 的 poll() 调用和 seek() 操作都能正确处理 offset 间隔——broker 返回的是下一个存在的 offset。

压缩会删除数据。从语义上看,compact 策略不删除任何"当前状态"。它删除的是同一个 key 的旧版本,保留最新版本。Tombstone(value=null)是显式的删除意图,表达的是"这个 key 不再有值",而不是"这条记录被清理了"。与 delete 策略的本质区别是:delete 按时间/大小盲删,不关心 key;compact 按 key 保留最新,不关心时间。

练习

  1. 创建一个 cleanup.policy=compact,delete 的 topic,写入若干条消息后分别观察压缩和过期删除的行为。思考:compact+delete 模式下,什么条件触发 compact,什么条件触发 delete?
  2. 查看本地 Kafka 集群的 __consumer_offsets topic 配置(kafka-topics.sh --describe),确认其 cleanup.policy 和 segment.bytes。创建一个 consumer group 消费消息后删除该 group,观察 __consumer_offsets 中的 tombstone。
  3. 设计一个场景:用 compact topic 实现分布式配置中心的数据存储层。每个配置项的 key 是配置名,value 是配置值(JSON)。考虑:新节点加入时如何获取完整配置?配置项删除后如何通知已有节点?
  4. 对比 Kafka log compaction 和 RocksDB compaction 的资源消耗模型:Kafka cleaner 的瓶颈是什么(CPU、内存、磁盘 I/O)?RocksDB 的 compaction 瓶颈又是什么?

系列导航

序号 主题
00 导读:为什么 Kafka 的核心是一根日志
01 架构总览:Broker、Controller 与元数据管理
02 日志存储:Segment、Index 与零拷贝
03 Producer 内部机制:攒批、分区与 acks
04 幂等 Producer 与序列号:消息不重不丢的第一层
05 Consumer Group 协议:分配、重平衡与静态成员
06 Offset 管理:提交、重置与消费语义
07 副本与 ISR:高可用的代价和折中
08 Controller 与 KRaft:从 ZooKeeper 到内置共识
09 Exactly-Once 与事务:跨 partition 的原子写入
10 日志压缩:把 topic 当 KV 表用
11 Kafka Streams:在日志之上构建流处理
12 Kafka Connect:标准化的数据管道
13 Schema Registry 与数据治理:给消息加上契约
14 性能模型:吞吐、延迟与调优思路
15 安全体系:认证、授权与加密
16 生产运维:集群扩缩、监控指标与故障排查
17 Kafka 与 RocketMQ:两种消息系统的设计选择

参考资料