前十篇覆盖了 Kafka 作为消息基础设施的核心机制。这一篇开始看 Kafka 的生态扩展——第一个是内置的流处理库 Kafka Streams。

流处理容易被误解成"又一套需要独立部署的集群"。Kafka Streams 的核心定位不是集群,而是一个嵌入应用 JVM 的 Java 库。它把 Kafka topic 当作输入和输出,在应用进程内完成流式计算。

本文只抓一个问题:Kafka Streams 怎样在追加日志之上构建出有状态的流处理。

架构定位:库,不是集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────────────────────────────────┐
│ Application JVM │
│ ┌───────────────────────────────────┐ │
│ │ Kafka Streams Library │ │
│ │ ┌─────────┐ ┌───────────────┐ │ │
│ │ │ Topology │ │ State Store │ │ │
│ │ │ (DAG) │ │ (RocksDB) │ │ │
│ │ └─────────┘ └───────────────┘ │ │
│ └───────────────────────────────────┘ │
│ ↑ consume ↓ produce │
└─────────┼──────────────────┼────────────┘
│ │
┌─────┴──────────────────┴─────┐
│ Kafka Cluster │
│ input topic → output topic │
└──────────────────────────────┘

Flink、Spark Streaming 需要独立的 JobManager/ResourceManager 或 Driver/Executor 集群。Kafka Streams 不需要。它作为依赖引入应用的 pom.xml,在 main() 方法里启动。扩缩容的方式是启动更多应用实例——Kafka Streams 利用 consumer group 协议完成 partition 分配和任务重平衡。

这意味着:

  • 不需要额外的资源调度器(YARN、Kubernetes Operator 仍可用,但不是必需的)。
  • 应用本身就是流处理集群的节点。
  • 部署方式和普通微服务一致。

Stream 与 Table 的对偶

Kafka Streams 的数据模型建立在 KStream 和 KTable 两个抽象之上。这两个抽象之间的关系是日志与表的对偶(log-table duality),来自第 10 篇日志压缩的同一思想。

1
2
3
4
5
6
7
KStream(事件流)                KTable(变更日志)
───────────────── ─────────────────
key=A, value=1 key=A → 1
key=B, value=2 key=B → 2
key=A, value=3 key=A → 3 (覆盖旧值)

结果:三条记录全部保留 结果:A=3, B=2(只保留最新值)

KStream 是纯粹的追加日志:每条记录都是独立事件,所有记录都保留。

KTable 是变更日志的物化视图:每个 key 只保留最新的 value。底层仍然是一个 Kafka topic(compact topic),KTable 读取这个 topic 的变更流,在本地维护最新快照。

对偶关系可以双向转换:

  • KStream → KTable:对 KStream 做 groupByKey().aggregate()groupByKey().reduce(),结果是一个 KTable。
  • KTable → KStream:调用 KTable.toStream(),把表的每次变更重新变成事件流。

拓扑:有向无环图

Kafka Streams 的计算逻辑组织成一个 Topology(拓扑),本质是一个有向无环图(DAG),由三种处理器节点组成:

1
2
3
Source Processor ──→ Stream Processor ──→ Sink Processor
(从 topic (filter / map / (写入 topic)
读取记录) join / aggregate)

Source Processor 从一个或多个 Kafka topic 消费记录。Stream Processor 执行具体的转换逻辑。Sink Processor 把处理结果写入输出 topic。

用 Streams DSL 构建拓扑时,每个操作(filter()map()groupByKey()join())都在 DAG 上添加一个处理器节点。Kafka Streams 把这个 DAG 编译成可执行的任务分配方案。

1
2
3
4
5
6
7
8
9
10
11
12
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source = builder.stream("input-topic");

KTable<String, Long> counts = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as("word-count-store"));

counts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));

Topology topology = builder.build();

这段代码生成的拓扑包含:一个 source processor 读 input-topic,一个 flatMapValues processor,一个 groupBy processor(触发 repartition),一个 count processor(带状态存储),一个 sink processor 写 output-topic

状态存储与容错

有状态操作(count、aggregate、join)需要在本地维护中间状态。Kafka Streams 默认使用 RocksDB 作为嵌入式状态存储。

1
2
3
4
5
6
7
8
9
10
11
┌─────────────────────────────┐
│ Stream Task │
│ ┌───────────────────────┐ │
│ │ State Store │ │
│ │ (RocksDB on disk) │ │ Kafka Cluster
│ └──────────┬────────────┘ │ ┌──────────────────┐
│ │ write-ahead │────→│ changelog topic │
│ │ │ │ (compact + delete)│
└─────────────┼───────────────┘ └──────────────────┘

local disk: /tmp/kafka-streams/<app-id>/

容错机制的核心是 changelog topic:

  1. 每次状态存储发生写入,变更记录同步追加到一个 Kafka topic(changelog topic)。
  2. 这个 changelog topic 开启日志压缩,保留每个 key 的最新值。
  3. 当应用实例崩溃后重启(或 partition 重新分配到另一个实例),新实例从 changelog topic 回放数据,重建本地状态。

changelog topic 的命名规则是 <application.id>-<state-store-name>-changelog。可以通过 kafka-topics.sh --list 观察到这些内部 topic。

状态恢复的代价和 changelog topic 的数据量成正比。Kafka Streams 提供 standby replicas(num.standby.replicas 配置)来缩短恢复时间:standby 副本在备用实例上持续复制 changelog,故障转移时直接切换,不需要从头回放。

窗口

流处理的时间维度通过窗口(window)表达。Kafka Streams 支持四种窗口类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
时间轴 →  |----|----|----|----|----|----|

Tumbling Window(翻转窗口,固定大小,不重叠):
[ ][ ][ ][ ]

Hopping Window(跳跃窗口,固定大小,可重叠):
[ ]
[ ]
[ ]

Sliding Window(滑动窗口,两条记录间距内聚合):
仅在 join 操作中使用

Session Window(会话窗口,按活跃间隔动态伸缩):
[ ][ ] [ ]
← gap →

窗口操作依赖时间语义。Kafka Streams 支持 event time(记录中的时间戳,默认从 Kafka 消息的 timestamp 字段提取)和 processing time(处理器墙上时钟)。默认使用 event time。

时间戳提取器(TimestampExtractor)决定每条记录的时间归属。内置实现 WallclockTimestampExtractor 使用处理时间,FailOnInvalidTimestamp 使用消息元数据中的时间戳。也可以自定义提取器,从消息 value 中解析业务时间。

Exactly-Once 语义

Kafka Streams 的 exactly-once 语义建立在第 09 篇介绍的 Kafka 事务之上。

配置 processing.guarantee=exactly_once_v2(自 Kafka 2.6 起推荐,替代旧的 exactly_once)后,Kafka Streams 会:

  1. 每个 StreamThread 初始化一个 transactional producer。
  2. 每次处理一批记录时,将输出记录的写入、consumer offset 的提交、state store changelog 的追加三个操作放在同一个事务中。
  3. 事务要么全部成功,要么全部回滚。

这确保了"读取-处理-写入"的原子性:不会出现输出已经写入但 offset 没有提交(导致重复处理),也不会出现 offset 已提交但输出丢失的情况。

exactly_once_v2 相比 exactly_once 的改进(KIP-447):将事务 producer 的粒度从每个 partition 提升到每个 StreamThread,减少了 transactional ID 的数量和 broker 端的事务协调开销。

Repartitioning:内部 topic 的产生

当拓扑中出现 key 变更操作(selectKey()map()groupBy()),Kafka Streams 需要保证相同 key 的记录发送到同一个 partition 以便正确聚合。这时 Kafka Streams 自动创建 repartition topic。

1
2
3
4
5
6
7
8
9
input-topic (key=原始key)

selectKey() → key 变了

repartition topic (key=新key,重新按 key hash 分区)

groupByKey().count()

output-topic

repartition topic 的命名规则是 <application.id>-<operation>-repartition。这些 topic 是自动创建和管理的,不需要手动干预,但在 topic 列表中可见。

repartition 带来额外的网络开销和延迟,因为数据需要经过一次完整的"写入 Kafka → 重新消费"流程。设计拓扑时应该尽量减少不必要的 key 变更。

实验:WordCount 拓扑

下面的实验用 Kafka Streams 构建一个 word count 拓扑,从 streams-input topic 读取文本,把每个单词的累计计数写入 streams-output topic。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.common.serialization.Serdes;
import java.util.*;

public class WordCountApp {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("streams-input");

KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as("word-count-store"));

wordCounts.toStream().to("streams-output",
Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}

创建输入输出 topic 并发送测试数据:

1
2
3
4
5
6
7
8
9
10
11
12
# 创建 topic
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic streams-input --partitions 3 --replication-factor 1

kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic streams-output --partitions 3 --replication-factor 1

# 发送测试数据
echo "hello kafka streams" | kafka-console-producer.sh \
--bootstrap-server localhost:9092 --topic streams-input
echo "hello kafka" | kafka-console-producer.sh \
--bootstrap-server localhost:9092 --topic streams-input

启动应用后,消费输出 topic 观察结果:

1
2
3
4
5
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-output \
--from-beginning \
--property print.key=true \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

预期输出:

1
2
3
4
5
hello    1
kafka 1
streams 1
hello 2
kafka 2

每次 hello 出现,计数递增。KTable 的 changelog 性质体现在输出中:每次更新都产生一条新记录,key 相同,value 是最新计数。

观察 Kafka Streams 自动创建的内部 topic:

1
kafka-topics.sh --bootstrap-server localhost:9092 --list | grep wordcount-app

预期看到:

1
2
wordcount-app-word-count-store-changelog
wordcount-app-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition

changelog topic 存储 state store 的变更日志。repartition topic 是 groupBy() 操作触发的重分区中间 topic。

模式提炼

Kafka Streams 代表的模式是"流处理即日志变换":

  • 输入是日志(Kafka topic)。
  • 输出也是日志(另一个 Kafka topic)。
  • 中间状态通过 changelog topic 持久化为日志。
  • 容错通过日志回放实现。

整个流处理过程没有脱离 Kafka 的追加日志抽象。状态存储看似是本地 RocksDB 数据库,但它的持久化保证来自 changelog topic——一个 compact topic。窗口聚合看似需要时间维度的特殊处理,但底层仍然是带 key 的追加写入。

这个模式的优势是部署简单(不需要额外集群),代价是计算能力受限于 Kafka 的 partition 并行度和单条记录的处理模型。对于需要复杂事件处理、迭代计算、或者大规模窗口聚合的场景,专用流处理引擎(Flink)更合适。

工程迁移表

概念 Kafka Streams Flink DataStream Spark Structured Streaming RocketMQ Streams
部署方式 嵌入应用 JVM 独立集群 (JobManager + TaskManager) 独立集群 (Driver + Executor) 嵌入应用 JVM
状态管理 RocksDB + changelog topic RocksDB + checkpoint (分布式存储) 内存 / RocksDB + checkpoint 内存状态
并行度 Kafka partition 数 task slot 数 executor core 数 Kafka partition 数
Exactly-once Kafka 事务 checkpoint barrier checkpoint + WAL 不原生支持
窗口类型 tumbling/hopping/sliding/session 全部 + 自定义 tumbling/sliding tumbling/hopping
延迟水位线 有限支持 完整 watermark 机制 watermark 支持
适用场景 轻量级流处理、微服务事件驱动 大规模复杂流处理 批流一体 轻量级流处理

常见误解

误解一:“Kafka Streams 需要部署一个独立集群。”

Kafka Streams 是 Java 库,运行在应用进程内。不需要 YARN、Mesos 或专用的流处理集群。扩容的方式是启动更多应用实例,和普通微服务扩容一致。Kafka Streams 通过 consumer group 协议在实例间分配 partition 和任务。

误解二:“KTable 把所有数据存在 Kafka 里。”

KTable 的数据分两部分:本地状态存储(RocksDB)持有当前快照,changelog topic 持有变更日志用于容错。读取操作从本地 RocksDB 查询,不经过 Kafka broker。changelog topic 是容错机制,不是主要的读路径。

误解三:“Kafka Streams 的 exactly-once 能覆盖外部系统。”

Kafka Streams 的 exactly-once 保证范围限于 Kafka 内部:从 Kafka topic 读取、处理、写入 Kafka topic 这条链路。如果处理逻辑涉及写入外部数据库、调用 HTTP 接口等副作用操作,exactly-once 不覆盖这些外部系统。外部系统的一致性需要应用层自行处理(幂等写入、事务性外部存储等)。

练习

  1. 在 WordCount 实验基础上,添加一个 5 秒的 tumbling window,观察输出中的窗口时间范围。提示:使用 windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(5))),输出的 key 变成 Windowed<String>

  2. 启动两个 WordCount 实例(相同 application.id),观察 Kafka Streams 如何在两个实例间重新分配 partition 和 state store。查看两个实例的日志中 StreamThread 的 partition 分配信息。

  3. 在 WordCount 运行过程中,用 kafka-topics.sh --describe 查看 changelog topic 的配置,确认 cleanup.policy=compact。然后强制重启应用实例,观察状态恢复的日志输出和耗时。

  4. 配置 processing.guarantee=exactly_once_v2,重复实验。用 kafka-console-consumer.sh 观察输出 topic,确认在应用重启期间不会产生重复计数。对比不开启 exactly-once 时的行为。

系列导航

序号 主题
00 导读:为什么 Kafka 的核心是一根日志
01 架构总览:Broker、Controller 与元数据管理
02 日志存储:Segment、Index 与零拷贝
03 Producer 内部机制:攒批、分区与 acks
04 幂等 Producer 与序列号:消息不重不丢的第一层
05 Consumer Group 协议:分配、重平衡与静态成员
06 Offset 管理:提交、重置与消费语义
07 副本与 ISR:高可用的代价和折中
08 Controller 与 KRaft:从 ZooKeeper 到内置共识
09 Exactly-Once 与事务:跨 partition 的原子写入
10 日志压缩:把 topic 当 KV 表用
11 Kafka Streams:在日志之上构建流处理
12 Kafka Connect:标准化的数据管道
13 Schema Registry 与数据治理:给消息加上契约
14 性能模型:吞吐、延迟与调优思路
15 安全体系:认证、授权与加密
16 生产运维:集群扩缩、监控指标与故障排查
17 Kafka 与 RocketMQ:两种消息系统的设计选择

参考资料

  1. Apache Kafka 官方文档 - Kafka Streams:https://kafka.apache.org/documentation/streams/
  2. KIP-447: Producer scalability for exactly once semantics:https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
  3. Jay Kreps. The Log: What every software engineer should know about real-time data’s unifying abstraction:https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
  4. Confluent - Kafka Streams Developer Guide:https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html
  5. Neha Narkhede, Gwen Shapira, Todd Palino. Kafka: The Definitive Guide. O’Reilly.