日志压缩:把 topic 当 KV 表用
前面的文章中,日志的保留策略一直是基于时间或大小的删除。这一篇介绍另一种保留策略:日志压缩。
日志压缩容易被理解成"压缩存储空间"。更准确的说法是:日志压缩是一种按 key 去重的保留策略,对每个 key 只保留最新的 value,把一个 append-only 的日志变成一个可以反映最新状态的 KV 快照。
本文只抓一个问题:日志压缩的语义、执行机制和适用场景。
两种保留策略
Kafka 的日志保留由 topic 级别的 cleanup.policy 参数控制,有三种取值:
1 | |
delete 策略是前面几篇文章一直在讨论的默认行为:segment 文件超过 retention.ms 或 retention.bytes 后被整体删除,不关心消息的 key 是什么。
compact 策略的语义完全不同:它不按时间或大小删除,而是保证对于日志中出现过的每个 key,至少保留该 key 的最新一条记录。旧版本的记录在压缩过程中被清除。
1 | |
压缩后,offset 0、1、2、3 的记录被清除,因为同一个 key 有更新的版本。注意 offset 编号不变——压缩不会重新编号,只是跳过被清除的 offset。Consumer 从 offset 0 开始消费时,会直接跳到 offset 4。
压缩的执行机制
日志压缩由 broker 内部的 Log Cleaner 线程执行。Cleaner 线程的数量由 log.cleaner.threads 参数控制(默认 1)。
每个 partition 的日志在压缩视角下被分为两部分:
1 | |
tail 是已经经过压缩的部分,每个 key 只出现一次。head 是尚未压缩的部分(dirty segment),可能包含重复 key。cleaner checkpoint 是两部分的分界点,持久化在 cleaner-offset-checkpoint 文件中。
Cleaner 线程的工作流程:
- 选择压缩目标。Cleaner 从所有 partition 中选择 dirty ratio 最高的 partition 进行压缩。Dirty ratio = dirty 部分大小 / 总日志大小。只有 dirty ratio 超过 min.cleanable.dirty.ratio(默认 0.5)时才会触发压缩。
- 构建 offset map。Cleaner 扫描 dirty 部分,在内存中构建一个 key → latest offset 的映射表。这个映射表的内存大小由 log.cleaner.dedupe.buffer.size 控制(默认 128MB)。
- 拷贝存活记录。Cleaner 遍历 tail 部分的每条记录,用 offset map 判断该记录是否是对应 key 的最新版本。如果不是,跳过;如果是,拷贝到新的 segment 文件中。
- 交换文件。新 segment 替换旧 segment,更新 cleaner checkpoint。
这个过程不会阻塞读写。Producer 继续向 head 追加写入,consumer 继续读取。Cleaner 操作的是已经关闭(不再追加)的 segment 文件。
Tombstone:删除语义
在 compact 策略下,删除一个 key 的方式是写入一条 value 为 null 的记录,这条记录被称为 tombstone(墓碑标记)。
Tombstone 不能被立刻清除。原因是:如果一个 consumer 在压缩之前断线,恢复后从旧 offset 重新消费,它需要看到 tombstone 才知道这个 key 已被删除。如果 tombstone 被过早清除,consumer 会认为该 key 的值仍然是上一次压缩后保留的旧值。
delete.retention.ms(默认 24 小时)控制 tombstone 的保留时间。Tombstone 在写入超过 delete.retention.ms 之后才会在下一次压缩中被真正清除。
压缩时机控制
两个参数控制压缩的时机:
min.compaction.lag.ms:消息写入后的最短等待时间,在此时间内的消息不会被压缩。用于保证 consumer 有足够的时间窗口读取到消息的中间状态。默认值为 0,即不做额外等待。
max.compaction.lag.ms:消息写入后的最长等待时间。如果一条消息超过这个时间还未被压缩,Cleaner 会强制触发压缩,即使 dirty ratio 未达到阈值。默认值为 Long.MAX_VALUE(实质上不启用)。自 Kafka 2.3 起引入(KIP-354),用于满足 GDPR 等合规场景下的数据删除时效要求。
__consumer_offsets:压缩的典型应用
Kafka 内部的 __consumer_offsets topic 就是一个 compact 策略的 topic。这个 topic 存储所有 consumer group 的 offset 提交记录。
每条 offset 提交记录的 key 是 (groupId, topic, partition) 三元组,value 是 offset 值和元数据。当同一个 consumer group 反复提交同一个 partition 的 offset 时,__consumer_offsets 中会积累大量旧记录。日志压缩保证只保留每个 (group, topic, partition) 组合的最新 offset,避免 topic 无限增长。
这也是为什么 consumer group 删除后需要写入 tombstone:group coordinator 为被删除的 group 的每个 (group, topic, partition) 写入 value=null 的记录,压缩后这些 key 被清除,consumer group 彻底从 __consumer_offsets 中消失。
其他使用场景
Changelog topic(Kafka Streams 的状态存储)。Kafka Streams 应用的每个 state store 都有一个对应的 changelog topic,用于持久化状态变更。State store 的每次 put 操作都会向 changelog topic 写入一条 (key, value) 记录,delete 操作写入 tombstone。日志压缩保证 changelog topic 是 state store 的完整快照——新实例或故障恢复的实例可以从 changelog topic 的头部消费,重建完整的 state store。
CDC 快照。Change Data Capture 场景中,数据库的每一行变更被发送到 Kafka,key 是主键,value 是行数据。日志压缩使 topic 成为数据库表的镜像:任意时刻从头消费,得到的是每一行的最新状态。
配置分发。把系统配置的每个 key 写入 compact topic,consumer 启动时从头消费获得完整配置,运行时实时接收配置变更。
实验:观察日志压缩
创建一个 compact 策略的 topic,验证压缩行为:
1 | |
写入多条同 key 的消息:
1 | |
等待 segment 滚动和 cleaner 执行(可能需要 1-2 分钟),然后从头消费:
1 | |
预期输出中,user1 只出现一次且值为 age=27,user2 只出现一次且值为 age=31,user3 出现一次且值为 age=35。
验证 tombstone 行为:
1 | |
注意 user3: 后面没有值,这会被序列化为 null value(tombstone)。等待压缩后再次从头消费,user3 会消失(如果 tombstone 保留期已过)或仍然以空值出现(如果在 delete.retention.ms 内)。
实验结果映射
这个实验展示了日志压缩的几个特性:
- 压缩不是即时的。消息写入后不会立刻触发压缩。Cleaner 线程按 dirty ratio 和时间间隔周期性执行。在 active segment(当前正在写入的 segment)关闭之前,其中的消息不会被压缩。
- Offset 不连续。压缩后的日志中 offset 存在间隔。Consumer 调用 poll() 时,broker 返回的是下一个存在的 offset 的消息,跳过被清除的 offset。
- Tombstone 有保留期。Tombstone 不会在第一次压缩时就被清除。Consumer 需要足够的时间窗口来看到删除标记。
模式提炼
日志压缩是一种基于 key 的日志结构化合并(log-structured merge with key-based retention)。
这种模式在存储系统中反复出现。LSM-tree(Log-Structured Merge Tree)的 compaction 与 Kafka 的 log compaction 有相似的结构:数据先写入内存(对应 Kafka 的 active segment),定期刷到不可变文件(对应 Kafka 的 closed segment),后台合并线程清理过期版本(对应 Kafka 的 cleaner)。
区别在于目的。LSM-tree 的 compaction 是为了合并多层 SSTable 以加速点查询和范围查询。Kafka 的 log compaction 不是为了加速查询(Kafka 本身不支持按 key 点查),而是为了保证从日志头部消费时能获得每个 key 的最新状态快照。
Git 的 object packing(git gc)是另一个类似模式:Git 定期把松散的 object 文件合并成 packfile,清理已经被新 commit 覆盖的旧 object。与 Kafka log compaction 的共性是"按引用关系保留最新版本"。
工程迁移表
| 概念 | Kafka Log Compaction | RocksDB Compaction | 数据库 MVCC Cleanup | Git gc |
|---|---|---|---|---|
| 触发条件 | dirty ratio 超过阈值 | level 大小超过阈值 | 事务可见性窗口外 | 松散对象超过阈值 |
| 保留语义 | 每个 key 保留最新 value | 每个 key 保留可见版本 | 每个行保留当前版本 | 每个对象保留可达版本 |
| 删除标记 | tombstone (null value) | tombstone marker | 删除标记 + undo log | 引用不可达即回收 |
| 删除延迟 | delete.retention.ms | 无(snapshot 释放后即清) | purge_thread 周期执行 | gc.pruneExpire |
| 空间回收时机 | cleaner 下一轮执行 | compaction 合并后 | purge 执行后 | repack 执行后 |
| 对读取的影响 | 不阻塞,consumer 跳过空洞 | 可能短暂增加读放大 | 不阻塞正常查询 | 不影响 git 命令 |
常见误解
压缩是即时发生的。压缩由后台 Cleaner 线程执行,受 dirty ratio、segment 滚动和 cleaner 调度周期影响。从消息写入到被压缩,延迟可能是分钟级甚至小时级。Active segment 中的消息在 segment 关闭之前不会被压缩。如果需要严格的压缩时效,可以通过 max.compaction.lag.ms 设置上限。
压缩后的 topic 没有 offset 间隔。压缩后的日志中 offset 存在间隔。被清除的旧版本记录的 offset 空出来,不会被重新分配。Consumer 的 poll() 调用和 seek() 操作都能正确处理 offset 间隔——broker 返回的是下一个存在的 offset。
压缩会删除数据。从语义上看,compact 策略不删除任何"当前状态"。它删除的是同一个 key 的旧版本,保留最新版本。Tombstone(value=null)是显式的删除意图,表达的是"这个 key 不再有值",而不是"这条记录被清理了"。与 delete 策略的本质区别是:delete 按时间/大小盲删,不关心 key;compact 按 key 保留最新,不关心时间。
练习
- 创建一个 cleanup.policy=compact,delete 的 topic,写入若干条消息后分别观察压缩和过期删除的行为。思考:compact+delete 模式下,什么条件触发 compact,什么条件触发 delete?
- 查看本地 Kafka 集群的 __consumer_offsets topic 配置(kafka-topics.sh --describe),确认其 cleanup.policy 和 segment.bytes。创建一个 consumer group 消费消息后删除该 group,观察 __consumer_offsets 中的 tombstone。
- 设计一个场景:用 compact topic 实现分布式配置中心的数据存储层。每个配置项的 key 是配置名,value 是配置值(JSON)。考虑:新节点加入时如何获取完整配置?配置项删除后如何通知已有节点?
- 对比 Kafka log compaction 和 RocksDB compaction 的资源消耗模型:Kafka cleaner 的瓶颈是什么(CPU、内存、磁盘 I/O)?RocksDB 的 compaction 瓶颈又是什么?
系列导航
参考资料
- Apache Kafka 官方文档 - Log Compaction:https://kafka.apache.org/documentation/#compaction
- KIP-354 - Add max.compaction.lag.ms to topic level configurations:https://cwiki.apache.org/confluence/display/KAFKA/KIP-354%3A+Add+a+Maximum+Log+Compaction+Lag
- Apache Kafka 官方文档 - __consumer_offsets:https://kafka.apache.org/documentation/#impl_offsettracking
- Neha Narkhede, Gwen Shapira, Todd Palino. Kafka: The Definitive Guide. O’Reilly. Chapter 5: Internals - Log Compaction.
- Jay Kreps. The Log: What every software engineer should know about real-time data’s unifying abstraction. https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
