上一篇解决了数据管道的标准化。这一篇解决管道里的数据该长什么样——消息的 schema 治理。

消息格式的约定容易被忽视。producer 和 consumer 对消息格式的理解仅靠代码约定和口头沟通时,任何一方的字段变更都可能在运行时导致反序列化失败。Schema Registry 的目标是把这种隐式约定变成显式的、可验证的契约。

本文只抓一个问题:Schema Registry 如何通过版本化 schema 和兼容性检查,在不停机的情况下实现消息格式的安全演进。

没有 Schema 治理时的问题

1
2
3
4
5
6
7
8
9
10
Producer v1                    Consumer v1
{name: "alice"} → 解析 {name} ✓

Producer v2 (加字段) Consumer v1 (未升级)
{name: "alice", → 解析 {name, age} ?
age: 30} age 字段未知,可能报错

Producer v3 (删字段) Consumer v2 (依赖 age)
{name: "alice", → 解析 {name, email}
email: "a@b.com"} age 字段消失,NPE

问题的根源是 producer 和 consumer 独立部署、独立升级。没有一个中心化的地方记录"这个 topic 的消息应该长什么样",也没有自动化的检查阻止不兼容的变更。

Schema Registry 架构

Schema Registry 是一个独立的服务进程,提供 REST API 来注册、查询和校验 schema。Confluent Schema Registry 是使用最广泛的参考实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌──────────────┐     REST API      ┌──────────────────┐
│ Producer │←─────────────────→│ Schema Registry │
│ (Serializer)│ register/get │ (HTTP service) │
└──────┬───────┘ └────────┬─────────┘
│ produce │ store
↓ ↓
┌──────────────┐ ┌──────────────────┐
│ Kafka Broker │ │ _schemas topic │
│ │ │ (Kafka internal) │
└──────┬───────┘ └──────────────────┘
│ consume

┌──────────────┐ REST API ┌──────────────────┐
│ Consumer │←─────────────────→│ Schema Registry │
│(Deserializer)│ get schema │ │
└──────────────┘ └──────────────────┘

Schema Registry 自身的数据存储在 Kafka 的内部 topic _schemas 中。这意味着 Schema Registry 利用 Kafka 的复制机制实现高可用——schema 数据不会因为单个 Registry 实例宕机而丢失。多个 Registry 实例可以组成集群,通过 leader 选举确定哪个实例处理写入请求。

支持的序列化格式

Schema Registry 支持三种 schema 格式:

Avro 是 Kafka 生态中最常用的格式。Avro schema 用 JSON 描述,序列化后是紧凑的二进制。Avro 的一个关键特性是 schema 演进能力——读写双方可以使用不同版本的 schema,只要版本之间兼容。

1
2
3
4
5
6
7
8
9
{
"type": "record",
"name": "User",
"namespace": "com.example",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": ["null", "int"], "default": null}
]
}

Protobuf 在 gRPC 生态中更常见。Schema Registry 支持 .proto 格式的 schema 注册和兼容性检查。

JSON Schema 用于 JSON 消息的结构验证,适合已经使用 JSON 格式且不想迁移到二进制序列化的场景。

三种格式中 Avro 在 Kafka 生态的支持最完善,也是 Confluent 官方推荐的默认格式。

Wire Format:消息里的 Schema ID

使用 Schema Registry 后,序列化的消息不再是裸的 Avro/Protobuf 字节。消息的前 5 个字节包含 schema 标识信息:

1
2
3
4
5
┌────────┬──────────────┬─────────────────────┐
│ byte 0 │ bytes 1-4 │ bytes 5+ │
│ 0x00 │ Schema ID │ Serialized payload │
│(magic) │ (big-endian) │ (Avro/Protobuf/JSON)│
└────────┴──────────────┴─────────────────────┘

byte 0 是 magic byte,固定为 0x00,标识这是 Schema Registry 序列化格式。bytes 1-4 是 4 字节大端序的 schema ID,指向 Schema Registry 中注册的具体 schema 版本。后续字节是用该 schema 序列化的实际数据。

序列化流程:

  1. Producer 的 KafkaAvroSerializer 拿到待发送的对象。
  2. Serializer 向 Schema Registry 注册对象的 schema(如果尚未注册),获取 schema ID。
  3. Serializer 写入 magic byte + schema ID + Avro 序列化后的字节。
  4. 整个字节数组作为 Kafka 消息的 value 发送。

反序列化流程:

  1. Consumer 的 KafkaAvroDeserializer 读取消息的前 5 个字节,提取 schema ID。
  2. Deserializer 用 schema ID 从 Schema Registry 获取 writer schema。
  3. 结合 reader schema(consumer 本地的 schema),用 Avro 的 schema resolution 规则反序列化后续字节。

Serializer 和 Deserializer 都会本地缓存 schema,避免每条消息都请求 Registry。

兼容性模式

Schema Registry 的核心治理能力是兼容性检查。注册新版本 schema 时,Registry 根据配置的兼容性模式决定是否允许。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
BACKWARD(默认):新 schema 能读旧数据
─────────────────────────────────────
允许:删除有默认值的字段、添加有默认值的字段
禁止:删除无默认值的字段、修改字段类型

FORWARD:旧 schema 能读新数据
─────────────────────────────────────
允许:添加字段(无论是否有默认值)、删除有默认值的字段
禁止:删除无默认值的字段

FULL:同时满足 BACKWARD 和 FORWARD
─────────────────────────────────────
允许:添加/删除有默认值的字段
禁止:添加/删除无默认值的字段

NONE:不检查兼容性
─────────────────────────────────────
允许所有变更(不推荐生产使用)

BACKWARD 兼容是默认模式,含义是:用新版本 schema 的 consumer 能正确读取用旧版本 schema 的 producer 写入的数据。这适合 consumer 先升级、producer 后升级的部署顺序。

FORWARD 兼容的含义相反:用旧版本 schema 的 consumer 能正确读取新版本 schema 的 producer 写入的数据。适合 producer 先升级的场景。

FULL 兼容同时满足两个方向,是最严格的模式,适合 producer 和 consumer 无法协调升级顺序的场景。

还有 BACKWARD_TRANSITIVEFORWARD_TRANSITIVEFULL_TRANSITIVE 三个变体,区别在于兼容性检查是只和上一个版本比较,还是和所有历史版本比较。

Subject 命名策略

Schema Registry 用 subject 来组织 schema。Subject 是一个逻辑命名空间,包含一个 schema 的所有版本。命名策略决定了 subject 名称如何从 topic 名和 record 名派生。

三种内置策略:

TopicNameStrategy(默认):subject = <topic>-key<topic>-value。一个 topic 只能有一种消息格式。

RecordNameStrategy:subject = 记录的全限定名(如 com.example.User)。允许同一个 topic 包含多种消息类型。

TopicRecordNameStrategy:subject = <topic>-<record>。按 topic 和记录名组合,既区分 topic 又区分消息类型。

1
2
3
TopicNameStrategy:        orders-value
RecordNameStrategy: com.example.Order
TopicRecordNameStrategy: orders-com.example.Order

选择哪种策略取决于 topic 的使用模式。单类型 topic(一个 topic 只传一种消息)用默认的 TopicNameStrategy。多类型 topic(事件溯源场景中一个 topic 承载多种事件类型)需要 RecordNameStrategy 或 TopicRecordNameStrategy。

Schema 演进工作流

一次典型的 schema 演进过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Step 1: 注册初始 schema (v1)
POST /subjects/orders-value/versions
Body: {"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"}
Response: {"id": 1}

Step 2: Producer 使用 schema v1 发送消息
消息 = [0x00][00 00 00 01][avro bytes]

Step 3: 演进 schema — 添加可选字段 (v2)
POST /subjects/orders-value/versions
Body: {"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"currency\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}
Response: {"id": 2} ← 兼容性检查通过

Step 4: 尝试不兼容的变更 — 删除无默认值的字段
POST /subjects/orders-value/versions
Body: {"schema": ...删除了 amount 字段...}
Response: 409 Conflict ← 兼容性检查失败

兼容性检查在注册时自动执行。不兼容的 schema 变更被 Registry 拒绝,避免破坏性变更进入生产环境。

实验:Schema 注册与演进

以下实验使用 Confluent Schema Registry 和 Avro 格式。假设 Schema Registry 运行在 localhost:8081

注册初始 schema:

1
2
3
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"}]}"}' \
http://localhost:8081/subjects/users-value/versions

预期返回 {"id":1}

查询已注册的 schema:

1
2
3
4
5
# 查看所有版本
curl http://localhost:8081/subjects/users-value/versions

# 查看最新版本的 schema
curl http://localhost:8081/subjects/users-value/versions/latest

演进 schema——添加可选字段:

1
2
3
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}]}"}' \
http://localhost:8081/subjects/users-value/versions

预期返回 {"id":2}。新增的 age 字段有默认值 null,满足 BACKWARD 兼容。

测试不兼容的变更——尝试删除无默认值的 name 字段:

1
2
3
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null}]}"}' \
http://localhost:8081/subjects/users-value/versions

预期返回 409 错误,Schema Registry 拒绝这个不兼容的变更。

检查兼容性(不实际注册):

1
2
3
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.example\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"age\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"email\",\"type\":[\"null\",\"string\"],\"default\":null}]}"}' \
http://localhost:8081/compatibility/subjects/users-value/versions/latest

预期返回 {"is_compatible":true}

模式提炼

Schema Registry 代表的模式是"契约优先的消息传递"(contract-first messaging):

  • 消息的格式不是隐式约定,而是显式注册在中心化的 registry 中。
  • 格式变更不是自由的,而是受兼容性规则约束。
  • producer 和 consumer 通过 schema ID 间接耦合,不需要共享代码库。

这个模式在其他技术领域有直接对应:

  • gRPC 使用 .proto 文件作为服务契约,protoc 编译器在构建时检查兼容性。
  • REST API 使用 OpenAPI spec 定义接口契约。
  • 数据库使用 DDL migration(Flyway/Liquibase)管理 schema 演进。

Kafka Schema Registry 和这些方案的区别在于它运行在消息传递路径上——serializer/deserializer 在每条消息的序列化和反序列化时自动与 Registry 交互,不需要开发者手动查询 schema。

工程迁移表

概念 Schema Registry Protobuf IDL 管理 OpenAPI Spec 数据库 DDL Migration
契约存储 中心化 Registry 服务 源码仓库中的 .proto 文件 源码仓库中的 YAML/JSON Migration 脚本
兼容性检查 运行时自动检查 编译时 protoc 检查 工具链检查(如 openapi-diff) Migration 脚本的顺序执行
版本管理 Registry 内置版本链 Git 版本控制 Git 版本控制 版本号 + 执行记录表
演进策略 BACKWARD/FORWARD/FULL 字段编号不复用 版本路径 (/v1, /v2) ALTER TABLE
运行时集成 Serializer/Deserializer 自动查询 编译时代码生成 客户端生成(可选) 应用启动时执行
多格式支持 Avro / Protobuf / JSON Schema 仅 Protobuf 仅 JSON SQL DDL

常见误解

误解一:“Schema Registry 是 Kafka 的必需组件。”

Schema Registry 是 Kafka 生态的可选组件。没有 Schema Registry,Kafka 完全可以正常工作——消息以字节数组存储和传输,producer 和 consumer 自行约定序列化格式。Schema Registry 提供的是治理能力:格式注册、版本管理、兼容性检查。小规模团队或内部系统如果能通过代码 review 保证格式一致性,可以不使用 Schema Registry。

误解二:“Schema Registry 存储消息数据。”

Schema Registry 只存储 schema 定义(元数据),不存储消息数据。消息数据仍然存储在 Kafka broker 中。Schema Registry 存储的是"消息应该长什么样"的描述,消息本身只包含一个 4 字节的 schema ID 引用。

误解三:“修改任何字段都会破坏兼容性。”

兼容性取决于具体的兼容模式和变更类型。在 BACKWARD 模式下,添加有默认值的字段是兼容的。在 FORWARD 模式下,添加任何字段都是兼容的。在 FULL 模式下,添加和删除有默认值的字段都是兼容的。关键区分点是字段是否有默认值——有默认值的字段可以安全地添加或删除,因为旧版本的读写方可以用默认值填充缺失字段。

练习

  1. 搭建 Confluent Schema Registry(可使用 Docker Compose)。注册一个 Avro schema,用 REST API 查看 subject 列表和 schema 详情。然后尝试在 BACKWARD 模式下注册一个不兼容的 schema 变更,观察 409 响应。

  2. 用 Java 代码编写一个 producer,配置 KafkaAvroSerializerschema.registry.url。发送几条消息后,用 kafka-console-consumer.sh 配合 kafka-avro-console-consumer(Confluent 工具)消费消息,观察反序列化后的 JSON 输出。

  3. 切换兼容性模式为 FULL,重复练习 1 的 schema 演进操作。对比 BACKWARD 和 FULL 模式下哪些变更被允许、哪些被拒绝。使用 REST API:PUT /config/users-value 设置 subject 级别的兼容性模式。

  4. 消费 Schema Registry 的内部 topic _schemas(需要 StringDeserializer),观察 schema 注册和配置变更在底层是如何存储的。注意这个 topic 使用 log compaction,每个 subject-version 对应一个 key。

系列导航

序号 主题
00 导读:为什么 Kafka 的核心是一根日志
01 架构总览:Broker、Controller 与元数据管理
02 日志存储:Segment、Index 与零拷贝
03 Producer 内部机制:攒批、分区与 acks
04 幂等 Producer 与序列号:消息不重不丢的第一层
05 Consumer Group 协议:分配、重平衡与静态成员
06 Offset 管理:提交、重置与消费语义
07 副本与 ISR:高可用的代价和折中
08 Controller 与 KRaft:从 ZooKeeper 到内置共识
09 Exactly-Once 与事务:跨 partition 的原子写入
10 日志压缩:把 topic 当 KV 表用
11 Kafka Streams:在日志之上构建流处理
12 Kafka Connect:标准化的数据管道
13 Schema Registry 与数据治理:给消息加上契约
14 性能模型:吞吐、延迟与调优思路
15 安全体系:认证、授权与加密
16 生产运维:集群扩缩、监控指标与故障排查
17 Kafka 与 RocketMQ:两种消息系统的设计选择

参考资料

  1. Confluent Schema Registry 文档:https://docs.confluent.io/platform/current/schema-registry/
  2. Avro Specification - Schema Resolution:https://avro.apache.org/docs/current/specification/#schema-resolution
  3. Confluent - Subject Name Strategy:https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#subject-name-strategy
  4. Confluent - Schema Evolution and Compatibility:https://docs.confluent.io/platform/current/schema-registry/fundamentals/schema-evolution.html
  5. Martin Kleppmann. Designing Data-Intensive Applications. O’Reilly, 2017. Chapter 4: Encoding and Evolution.