Schema Registry 与数据治理:给消息加上契约
上一篇解决了数据管道的标准化。这一篇解决管道里的数据该长什么样——消息的 schema 治理。
消息格式的约定容易被忽视。producer 和 consumer 对消息格式的理解仅靠代码约定和口头沟通时,任何一方的字段变更都可能在运行时导致反序列化失败。Schema Registry 的目标是把这种隐式约定变成显式的、可验证的契约。
本文只抓一个问题:Schema Registry 如何通过版本化 schema 和兼容性检查,在不停机的情况下实现消息格式的安全演进。
没有 Schema 治理时的问题
1 | |
问题的根源是 producer 和 consumer 独立部署、独立升级。没有一个中心化的地方记录"这个 topic 的消息应该长什么样",也没有自动化的检查阻止不兼容的变更。
Schema Registry 架构
Schema Registry 是一个独立的服务进程,提供 REST API 来注册、查询和校验 schema。Confluent Schema Registry 是使用最广泛的参考实现。
1 | |
Schema Registry 自身的数据存储在 Kafka 的内部 topic _schemas 中。这意味着 Schema Registry 利用 Kafka 的复制机制实现高可用——schema 数据不会因为单个 Registry 实例宕机而丢失。多个 Registry 实例可以组成集群,通过 leader 选举确定哪个实例处理写入请求。
支持的序列化格式
Schema Registry 支持三种 schema 格式:
Avro 是 Kafka 生态中最常用的格式。Avro schema 用 JSON 描述,序列化后是紧凑的二进制。Avro 的一个关键特性是 schema 演进能力——读写双方可以使用不同版本的 schema,只要版本之间兼容。
1 | |
Protobuf 在 gRPC 生态中更常见。Schema Registry 支持 .proto 格式的 schema 注册和兼容性检查。
JSON Schema 用于 JSON 消息的结构验证,适合已经使用 JSON 格式且不想迁移到二进制序列化的场景。
三种格式中 Avro 在 Kafka 生态的支持最完善,也是 Confluent 官方推荐的默认格式。
Wire Format:消息里的 Schema ID
使用 Schema Registry 后,序列化的消息不再是裸的 Avro/Protobuf 字节。消息的前 5 个字节包含 schema 标识信息:
1 | |
byte 0 是 magic byte,固定为 0x00,标识这是 Schema Registry 序列化格式。bytes 1-4 是 4 字节大端序的 schema ID,指向 Schema Registry 中注册的具体 schema 版本。后续字节是用该 schema 序列化的实际数据。
序列化流程:
- Producer 的
KafkaAvroSerializer拿到待发送的对象。 - Serializer 向 Schema Registry 注册对象的 schema(如果尚未注册),获取 schema ID。
- Serializer 写入 magic byte + schema ID + Avro 序列化后的字节。
- 整个字节数组作为 Kafka 消息的 value 发送。
反序列化流程:
- Consumer 的
KafkaAvroDeserializer读取消息的前 5 个字节,提取 schema ID。 - Deserializer 用 schema ID 从 Schema Registry 获取 writer schema。
- 结合 reader schema(consumer 本地的 schema),用 Avro 的 schema resolution 规则反序列化后续字节。
Serializer 和 Deserializer 都会本地缓存 schema,避免每条消息都请求 Registry。
兼容性模式
Schema Registry 的核心治理能力是兼容性检查。注册新版本 schema 时,Registry 根据配置的兼容性模式决定是否允许。
1 | |
BACKWARD 兼容是默认模式,含义是:用新版本 schema 的 consumer 能正确读取用旧版本 schema 的 producer 写入的数据。这适合 consumer 先升级、producer 后升级的部署顺序。
FORWARD 兼容的含义相反:用旧版本 schema 的 consumer 能正确读取新版本 schema 的 producer 写入的数据。适合 producer 先升级的场景。
FULL 兼容同时满足两个方向,是最严格的模式,适合 producer 和 consumer 无法协调升级顺序的场景。
还有 BACKWARD_TRANSITIVE、FORWARD_TRANSITIVE、FULL_TRANSITIVE 三个变体,区别在于兼容性检查是只和上一个版本比较,还是和所有历史版本比较。
Subject 命名策略
Schema Registry 用 subject 来组织 schema。Subject 是一个逻辑命名空间,包含一个 schema 的所有版本。命名策略决定了 subject 名称如何从 topic 名和 record 名派生。
三种内置策略:
TopicNameStrategy(默认):subject = <topic>-key 或 <topic>-value。一个 topic 只能有一种消息格式。
RecordNameStrategy:subject = 记录的全限定名(如 com.example.User)。允许同一个 topic 包含多种消息类型。
TopicRecordNameStrategy:subject = <topic>-<record>。按 topic 和记录名组合,既区分 topic 又区分消息类型。
1 | |
选择哪种策略取决于 topic 的使用模式。单类型 topic(一个 topic 只传一种消息)用默认的 TopicNameStrategy。多类型 topic(事件溯源场景中一个 topic 承载多种事件类型)需要 RecordNameStrategy 或 TopicRecordNameStrategy。
Schema 演进工作流
一次典型的 schema 演进过程:
1 | |
兼容性检查在注册时自动执行。不兼容的 schema 变更被 Registry 拒绝,避免破坏性变更进入生产环境。
实验:Schema 注册与演进
以下实验使用 Confluent Schema Registry 和 Avro 格式。假设 Schema Registry 运行在 localhost:8081。
注册初始 schema:
1 | |
预期返回 {"id":1}。
查询已注册的 schema:
1 | |
演进 schema——添加可选字段:
1 | |
预期返回 {"id":2}。新增的 age 字段有默认值 null,满足 BACKWARD 兼容。
测试不兼容的变更——尝试删除无默认值的 name 字段:
1 | |
预期返回 409 错误,Schema Registry 拒绝这个不兼容的变更。
检查兼容性(不实际注册):
1 | |
预期返回 {"is_compatible":true}。
模式提炼
Schema Registry 代表的模式是"契约优先的消息传递"(contract-first messaging):
- 消息的格式不是隐式约定,而是显式注册在中心化的 registry 中。
- 格式变更不是自由的,而是受兼容性规则约束。
- producer 和 consumer 通过 schema ID 间接耦合,不需要共享代码库。
这个模式在其他技术领域有直接对应:
- gRPC 使用
.proto文件作为服务契约,protoc 编译器在构建时检查兼容性。 - REST API 使用 OpenAPI spec 定义接口契约。
- 数据库使用 DDL migration(Flyway/Liquibase)管理 schema 演进。
Kafka Schema Registry 和这些方案的区别在于它运行在消息传递路径上——serializer/deserializer 在每条消息的序列化和反序列化时自动与 Registry 交互,不需要开发者手动查询 schema。
工程迁移表
| 概念 | Schema Registry | Protobuf IDL 管理 | OpenAPI Spec | 数据库 DDL Migration |
|---|---|---|---|---|
| 契约存储 | 中心化 Registry 服务 | 源码仓库中的 .proto 文件 | 源码仓库中的 YAML/JSON | Migration 脚本 |
| 兼容性检查 | 运行时自动检查 | 编译时 protoc 检查 | 工具链检查(如 openapi-diff) | Migration 脚本的顺序执行 |
| 版本管理 | Registry 内置版本链 | Git 版本控制 | Git 版本控制 | 版本号 + 执行记录表 |
| 演进策略 | BACKWARD/FORWARD/FULL | 字段编号不复用 | 版本路径 (/v1, /v2) | ALTER TABLE |
| 运行时集成 | Serializer/Deserializer 自动查询 | 编译时代码生成 | 客户端生成(可选) | 应用启动时执行 |
| 多格式支持 | Avro / Protobuf / JSON Schema | 仅 Protobuf | 仅 JSON | SQL DDL |
常见误解
误解一:“Schema Registry 是 Kafka 的必需组件。”
Schema Registry 是 Kafka 生态的可选组件。没有 Schema Registry,Kafka 完全可以正常工作——消息以字节数组存储和传输,producer 和 consumer 自行约定序列化格式。Schema Registry 提供的是治理能力:格式注册、版本管理、兼容性检查。小规模团队或内部系统如果能通过代码 review 保证格式一致性,可以不使用 Schema Registry。
误解二:“Schema Registry 存储消息数据。”
Schema Registry 只存储 schema 定义(元数据),不存储消息数据。消息数据仍然存储在 Kafka broker 中。Schema Registry 存储的是"消息应该长什么样"的描述,消息本身只包含一个 4 字节的 schema ID 引用。
误解三:“修改任何字段都会破坏兼容性。”
兼容性取决于具体的兼容模式和变更类型。在 BACKWARD 模式下,添加有默认值的字段是兼容的。在 FORWARD 模式下,添加任何字段都是兼容的。在 FULL 模式下,添加和删除有默认值的字段都是兼容的。关键区分点是字段是否有默认值——有默认值的字段可以安全地添加或删除,因为旧版本的读写方可以用默认值填充缺失字段。
练习
-
搭建 Confluent Schema Registry(可使用 Docker Compose)。注册一个 Avro schema,用 REST API 查看 subject 列表和 schema 详情。然后尝试在 BACKWARD 模式下注册一个不兼容的 schema 变更,观察 409 响应。
-
用 Java 代码编写一个 producer,配置
KafkaAvroSerializer和schema.registry.url。发送几条消息后,用kafka-console-consumer.sh配合kafka-avro-console-consumer(Confluent 工具)消费消息,观察反序列化后的 JSON 输出。 -
切换兼容性模式为 FULL,重复练习 1 的 schema 演进操作。对比 BACKWARD 和 FULL 模式下哪些变更被允许、哪些被拒绝。使用 REST API:
PUT /config/users-value设置 subject 级别的兼容性模式。 -
消费 Schema Registry 的内部 topic
_schemas(需要StringDeserializer),观察 schema 注册和配置变更在底层是如何存储的。注意这个 topic 使用 log compaction,每个 subject-version 对应一个 key。
系列导航
| 序号 | 主题 |
|---|---|
| 00 | 导读:为什么 Kafka 的核心是一根日志 |
| 01 | 架构总览:Broker、Controller 与元数据管理 |
| 02 | 日志存储:Segment、Index 与零拷贝 |
| 03 | Producer 内部机制:攒批、分区与 acks |
| 04 | 幂等 Producer 与序列号:消息不重不丢的第一层 |
| 05 | Consumer Group 协议:分配、重平衡与静态成员 |
| 06 | Offset 管理:提交、重置与消费语义 |
| 07 | 副本与 ISR:高可用的代价和折中 |
| 08 | Controller 与 KRaft:从 ZooKeeper 到内置共识 |
| 09 | Exactly-Once 与事务:跨 partition 的原子写入 |
| 10 | 日志压缩:把 topic 当 KV 表用 |
| 11 | Kafka Streams:在日志之上构建流处理 |
| 12 | Kafka Connect:标准化的数据管道 |
| 13 | Schema Registry 与数据治理:给消息加上契约 |
| 14 | 性能模型:吞吐、延迟与调优思路 |
| 15 | 安全体系:认证、授权与加密 |
| 16 | 生产运维:集群扩缩、监控指标与故障排查 |
| 17 | Kafka 与 RocketMQ:两种消息系统的设计选择 |
参考资料
- Confluent Schema Registry 文档:https://docs.confluent.io/platform/current/schema-registry/
- Avro Specification - Schema Resolution:https://avro.apache.org/docs/current/specification/#schema-resolution
- Confluent - Subject Name Strategy:https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#subject-name-strategy
- Confluent - Schema Evolution and Compatibility:https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html
- Martin Kleppmann. Designing Data-Intensive Applications. O’Reilly, 2017. Chapter 4: Encoding and Evolution.
