上一篇看了 Kafka 内置的流处理库。这一篇转向数据集成:怎么把外部系统的数据可靠地搬进搬出 Kafka。

数据集成容易退化成"给每个数据源写一个 producer,给每个数据目标写一个 consumer"。当数据源和目标各有几十个时,组合爆炸。Kafka Connect 的目标是把这个问题标准化:用统一的框架和插件机制,把数据管道的公共逻辑(offset 追踪、序列化、容错、分布式任务分配)收到框架层,让开发者只关注"怎么从特定系统读数据"和"怎么往特定系统写数据"。

本文只抓一个问题:Kafka Connect 如何用标准化的 connector/task 模型实现可靠的数据管道。

架构模型

1
2
3
4
5
6
7
8
9
10
11
12
    Kafka Connect Cluster
┌────────────────────────────┐
│ Worker 1 Worker 2 │
│ ┌────────┐ ┌────────┐ │
│ │Task A-0│ │Task A-1│ │ ← Source Connector A (2 tasks)
│ │Task B-0│ │Task B-1│ │ ← Sink Connector B (2 tasks)
│ └────────┘ └────────┘ │
└──────────┬─────────────────┘

┌──────┴──────┐
│ Kafka Cluster │
└─────────────┘

Kafka Connect 由三层对象组成:

Worker 是运行 connector 和 task 的 JVM 进程。Worker 分两种部署模式:standalone(单进程,用于开发和测试)和 distributed(多进程组成集群,用于生产)。

Connector 是用户配置的逻辑实体,定义"从哪里读"或"往哪里写"。一个 connector 实例不直接搬运数据,它负责管理一组 task 并决定如何把工作拆分。

Task 是实际执行数据搬运的工作单元。Source task 从外部系统读数据、写入 Kafka。Sink task 从 Kafka 读数据、写入外部系统。一个 connector 可以拆出多个 task 并行工作。

Source 与 Sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
External System          Kafka Connect           Kafka Cluster
┌─────────┐ ┌──────────────────────┐ ┌────────────┐
│ MySQL │─────→│ Source Connector │─────→│ topic-a │
│ Files │ │ (read → Kafka) │ │ │
└─────────┘ └──────────────────────┘ └────────────┘

┌──────────────────────┐ ┌────────────┐
│ Sink Connector │←─────│ topic-b │
│ (Kafka → write) │ │ │
└──────────┬───────────┘ └────────────┘

┌─────────────────────┐
│ Elasticsearch / S3 │
└─────────────────────┘

Source Connector 的职责是轮询外部系统获取新数据,封装成 SourceRecord 列表返回给 Connect 框架。框架负责把 SourceRecord 序列化后发送到 Kafka topic。

Sink Connector 的职责是接收 Connect 框架从 Kafka topic 消费到的 SinkRecord 列表,把数据写入外部系统。

这种分层设计的核心价值:connector 开发者不需要关心 Kafka producer/consumer 的配置、offset 管理、序列化协议、错误重试——这些全部由 Connect 框架处理。

Converter 与 SMT

数据在 Connect 框架内经过两层处理:

1
2
External Data → Connector → Connect Data (Struct/Schema) 
→ SMT Pipeline → Converter → Kafka (bytes)

Converter 负责序列化和反序列化。常用的 converter:

  • JsonConverter:序列化为 JSON,可配置是否在消息中嵌入 schema。
  • AvroConverter:配合 Schema Registry 使用 Avro 序列化(下一篇详述)。
  • StringConverter:原样保留字符串,适合纯文本场景。
  • ByteArrayConverter:原样保留字节数组。

SMT(Single Message Transform)是轻量级的消息变换管道,在 converter 之前(source)或之后(sink)执行。每个 SMT 对单条消息做一次变换,多个 SMT 可以链式组合。

1
2
3
4
5
6
7
{
"transforms": "addTimestamp,maskField",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "ingest_time",
"transforms.maskField.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.maskField.fields": "password"
}

SMT 的设计原则是轻量:适合字段重命名、添加时间戳、路由到不同 topic 等简单变换。复杂的数据清洗和转换应该交给 Kafka Streams 或独立的 ETL 流程。

Offset 追踪

Source connector 需要记住"读到哪了",否则重启后会重复读取数据。Connect 框架为 source connector 提供了内置的 offset 存储机制。

1
2
3
4
5
Source Task → 返回 SourceRecord(含 source partition + source offset)

Connect 框架 → 写入 Kafka topic: __connect_offsets

重启时 → 从 __connect_offsets 恢复上次读取位置

每个 SourceRecord 包含两部分位置信息:

  • source partition:标识数据来自哪个分区(比如数据库的哪张表、文件系统的哪个文件)。
  • source offset:标识在这个分区内读到了哪个位置(比如数据库表的自增 ID、文件的字节偏移量)。

Connect 框架把这些 offset 存储在内部 topic __connect_offsets 中。standalone 模式下存储在本地文件。

Sink connector 的 offset 追踪更简单:它使用标准的 Kafka consumer offset 机制(__consumer_offsets topic),和普通 consumer 一样。

分布式模式与任务重平衡

生产环境通常使用 distributed 模式。多个 worker 组成一个 Connect 集群,通过三个内部 Kafka topic 协调状态:

内部 topic 用途
connect-configs 存储 connector 和 task 的配置
connect-offsets 存储 source connector 的 offset
connect-status 存储 connector 和 task 的运行状态

当 worker 加入或离开集群时,Connect 触发任务重平衡(rebalance)。重平衡的机制和 consumer group 类似——基于 group 协议在 worker 之间重新分配 task。

1
2
3
4
5
6
7
8
9
10
初始状态:
Worker-1: [task-A-0, task-B-0]
Worker-2: [task-A-1, task-B-1]

Worker-2 宕机后重平衡:
Worker-1: [task-A-0, task-A-1, task-B-0, task-B-1]

Worker-2 恢复后重平衡:
Worker-1: [task-A-0, task-B-0]
Worker-2: [task-A-1, task-B-1]

自 Kafka 2.3 起,Connect 支持 incremental cooperative rebalancing(KIP-415),减少了重平衡期间的全局停顿——只有受影响的 task 需要暂停和迁移,其他 task 继续运行。

Exactly-Once Source Connector

自 Kafka 3.3 起(KIP-618),Connect 框架支持 exactly-once source connector。开启方式:

1
2
# worker 配置
exactly.once.source.support=enabled

实现原理:Connect 框架使用 Kafka 事务,将 source record 的写入和 source offset 的提交放在同一个事务中。如果 task 在写入中途失败,事务回滚,offset 不更新,重启后从上次位置重新读取,不会产生重复数据。

这个机制和 Kafka Streams 的 exactly-once 思路一致:把多个操作捆绑在一个 Kafka 事务中。

常用 Connector

Kafka Connect 生态中有大量社区和商业 connector。几个常见的:

Connector 类型 用途
JDBC Source/Sink Source + Sink 关系型数据库的全量/增量同步
Debezium Source 基于数据库 CDC(Change Data Capture)的变更捕获
FileStreamSource/Sink Source + Sink 从文件读取/写入,仅用于测试
S3 Sink Sink 将 Kafka 数据写入 S3(Parquet、Avro、JSON)
Elasticsearch Sink Sink 将 Kafka 数据索引到 Elasticsearch
HDFS Sink Sink 将 Kafka 数据写入 HDFS

JDBC Source Connector 支持两种增量模式:基于自增列(incrementing)和基于时间戳列(timestamp),也可以两者组合。Debezium 通过读取数据库的 binlog/WAL 实现 CDC,比轮询更实时,对数据库的压力更小。

实验:FileStream Connector

用 Kafka 自带的 FileStreamSourceConnector 和 FileStreamSinkConnector 做一个端到端实验。

准备输入文件:

1
2
echo "line one" > /tmp/connect-input.txt
echo "line two" >> /tmp/connect-input.txt

使用 standalone 模式启动 Connect,需要两个配置文件。

worker 配置(connect-standalone.properties):

1
2
3
4
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.file.filename=/tmp/connect.offsets

source connector 配置(connect-file-source.properties):

1
2
3
4
5
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/tmp/connect-input.txt
topic=connect-test

sink connector 配置(connect-file-sink.properties):

1
2
3
4
5
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/tmp/connect-output.txt
topics=connect-test

启动 Connect:

1
2
3
4
connect-standalone.sh \
config/connect-standalone.properties \
config/connect-file-source.properties \
config/connect-file-sink.properties

验证输出:

1
cat /tmp/connect-output.txt

预期输出:

1
2
line one
line two

继续追加数据到输入文件:

1
echo "line three" >> /tmp/connect-input.txt

等待几秒后再次检查输出文件,line three 会自动出现。FileStreamSourceConnector 持续监控文件变化,新行自动进入 Kafka topic,再由 sink connector 写入输出文件。

检查 offset 存储:

1
cat /tmp/connect.offsets

可以看到 source connector 记录了文件读取的字节偏移量。重启 Connect 后,source connector 从上次的偏移量继续读取,不会重复处理已有数据。

模式提炼

Kafka Connect 代表的模式是"标准化数据管道框架":

  • 把数据集成的公共逻辑(offset 管理、序列化、分布式协调、容错)抽取到框架层。
  • 用 connector/task 插件接口隔离数据源和数据目标的实现细节。
  • 用内部 Kafka topic 存储配置和状态,利用 Kafka 自身的持久化和复制能力实现框架层的高可用。

这个模式在其他生态中有对应物。Flink CDC 用 Flink 的 checkpoint 机制管理 connector 状态。Debezium 既可以作为 Kafka Connect 的插件运行,也可以独立部署。Spring Integration 提供类似的 source/sink 抽象但不包含分布式协调。

和 Kafka Streams 类似,Connect 的核心能力依赖 Kafka 自身:配置存储、offset 存储、状态同步全部通过 Kafka topic 实现。这简化了 Connect 集群的运维(不需要额外的外部存储),代价是 Connect 集群和 Kafka 集群紧密耦合。

工程迁移表

概念 Kafka Connect Flink CDC Debezium (standalone) 自定义 Producer/Consumer 数据库原生复制
集成方式 标准化框架 + 插件 Flink source function 嵌入式引擎 手写代码 数据库内置
Offset 管理 框架自动(Kafka topic) Flink checkpoint 框架自动 自行实现 数据库内置
分布式协调 Connect group 协议 Flink JobManager 不支持 自行实现 数据库内置
Exactly-once 支持(KIP-618) checkpoint barrier 依赖下游 自行实现 取决于数据库
变换能力 SMT(轻量) Flink SQL / DataStream SMT 无限制 受限
CDC 支持 通过 Debezium 插件 内置 CDC connector 核心功能 手动解析 binlog 原生
运维复杂度 中等 取决于实现

常见误解

误解一:“Kafka Connect 是另一个消息代理。”

Kafka Connect 不是消息代理。它是一个数据集成框架,运行在 Kafka 之上,把数据从外部系统搬入 Kafka(source)或从 Kafka 搬到外部系统(sink)。消息的存储和传递仍然由 Kafka broker 负责。

误解二:“standalone 模式适合生产环境。”

standalone 模式只运行一个 worker 进程,没有高可用。worker 宕机后所有 task 停止,需要手动重启。生产环境应该使用 distributed 模式,多个 worker 组成集群,task 可以在 worker 之间自动迁移。standalone 模式适合本地开发和功能验证。

误解三:“Connect 可以做复杂的数据转换。”

SMT 的设计目标是轻量级的单消息变换:字段重命名、添加/删除字段、路由到不同 topic。涉及多消息聚合、窗口计算、流式 join 等复杂逻辑时,应该在 Kafka topic 上接 Kafka Streams 或 Flink 处理,而不是在 SMT 中实现。强行在 SMT 中做复杂逻辑会导致 connector 难以维护和调试。

练习

  1. 用 FileStreamSourceConnector 和 FileStreamSinkConnector 搭建实验。在 source connector 运行期间,删除并重建输入文件,观察 offset 存储的行为和 connector 的错误日志。

  2. 切换到 distributed 模式运行 Connect。启动两个 worker,通过 REST API 提交一个 JDBC Source Connector 配置(tasks.max=2),观察两个 task 分别分配到哪个 worker。然后停掉一个 worker,观察 task 重平衡。

  3. 为 FileStreamSourceConnector 配置一个 SMT,给每条消息添加一个 ingest_timestamp 字段。用 kafka-console-consumer.sh 消费输出 topic,确认新字段存在。

  4. 查看 distributed 模式下的三个内部 topic(connect-configsconnect-offsetsconnect-status),理解各自存储的内容结构。用 kafka-console-consumer.sh --from-beginning 读取 connect-configs topic,看到 connector 的配置 JSON。

系列导航

序号 主题
00 导读:为什么 Kafka 的核心是一根日志
01 架构总览:Broker、Controller 与元数据管理
02 日志存储:Segment、Index 与零拷贝
03 Producer 内部机制:攒批、分区与 acks
04 幂等 Producer 与序列号:消息不重不丢的第一层
05 Consumer Group 协议:分配、重平衡与静态成员
06 Offset 管理:提交、重置与消费语义
07 副本与 ISR:高可用的代价和折中
08 Controller 与 KRaft:从 ZooKeeper 到内置共识
09 Exactly-Once 与事务:跨 partition 的原子写入
10 日志压缩:把 topic 当 KV 表用
11 Kafka Streams:在日志之上构建流处理
12 Kafka Connect:标准化的数据管道
13 Schema Registry 与数据治理:给消息加上契约
14 性能模型:吞吐、延迟与调优思路
15 安全体系:认证、授权与加密
16 生产运维:集群扩缩、监控指标与故障排查
17 Kafka 与 RocketMQ:两种消息系统的设计选择

参考资料

  1. Apache Kafka 官方文档 - Kafka Connect:https://kafka.apache.org/documentation/#connect
  2. KIP-618: Exactly-Once Support for Source Connectors:https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors
  3. KIP-415: Incremental Cooperative Rebalancing in Connect:https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Connect
  4. Confluent - Kafka Connect Developer Guide:https://docs.confluent.io/platform/current/connect/devguide.html
  5. Debezium Documentation:https://debezium.io/documentation/