上一篇分析了性能模型。这一篇进入生产环境的另一个关键维度:安全。

Kafka 早期版本没有内置安全机制——客户端连上 broker 就能读写任意 topic。自 0.9 版本起,Kafka 逐步引入了认证、授权和加密三层安全能力。这三层各自解决一个问题:认证确认身份,授权控制权限,加密保护传输通道。

本文只抓一个问题:Kafka 的安全体系如何分层工作,每一层的配置和机制是什么。

三层安全模型

1
2
3
4
5
6
7
8
9
10
11
Client                          Broker
| |
|--- TLS handshake ------------>| Layer 3: Encryption
|<-- certificate exchange ------| (通道加密)
| |
|--- SASL authentication ----->| Layer 1: Authentication
|<-- auth result ---------------| (身份认证)
| |
|--- produce/fetch request ---->| Layer 2: Authorization
| ACL check: allow/deny | (权限控制)
|<-- response ------------------|

三层可以独立启用。只要加密不要认证(SSL),只要认证不要加密(SASL_PLAINTEXT),或者三层全开(SASL_SSL),都是合法的组合。Kafka 通过 listener 的 security.protocol 配置来指定每个端口使用哪种组合。

认证机制

Kafka 支持四种主要的认证方式,通过 SASL(Simple Authentication and Security Layer)框架统一接入。

SASL/PLAIN:用户名 + 明文密码。配置最简单,适合开发和测试环境。密码以明文传输,生产环境必须搭配 TLS 使用(即 SASL_SSL 协议)。

SASL/SCRAM(Salted Challenge Response Authentication Mechanism):密码不在网络上明文传输,使用挑战-应答机制。支持 SCRAM-SHA-256 和 SCRAM-SHA-512 两种强度。用户凭证存储在 ZooKeeper 或 KRaft 的元数据中,支持动态增删用户,不需要重启 broker。

SASL/GSSAPI(Kerberos):企业级认证协议,依赖外部 KDC(Key Distribution Center)。适合已有 Kerberos 基础设施的组织(如 Hadoop 集群、Active Directory 环境)。配置复杂度最高。

mTLS(双向 TLS):客户端和 broker 互相验证对方的 X.509 证书。不走 SASL 框架,而是通过 TLS 层直接完成认证。客户端的证书 DN(Distinguished Name)作为 principal 用于后续的授权判断。适合服务间通信,不适合面向终端用户的场景。

SASL/SCRAM 配置实战

SASL/SCRAM 是生产环境中最常用的认证方式之一,因为它安全性足够且不依赖外部 KDC。以下是配置步骤。

第一步,创建 SCRAM 用户凭证。在 ZooKeeper 模式下:

1
2
3
4
5
6
7
8
9
kafka-configs.sh --zookeeper localhost:2181 \
--alter --add-config \
'SCRAM-SHA-256=[iterations=8192,password=producer-secret]' \
--entity-type users --entity-name producer-user

kafka-configs.sh --zookeeper localhost:2181 \
--alter --add-config \
'SCRAM-SHA-256=[iterations=8192,password=consumer-secret]' \
--entity-type users --entity-name consumer-user

在 KRaft 模式下,使用 --bootstrap-server 替代 --zookeeper:

1
2
3
4
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --add-config \
'SCRAM-SHA-256=[iterations=8192,password=producer-secret]' \
--entity-type users --entity-name producer-user

第二步,配置 broker。在 server.properties 中添加:

1
2
3
4
5
6
7
8
# listener 配置
listeners=SASL_PLAINTEXT://0.0.0.0:9092
advertised.listeners=SASL_PLAINTEXT://broker1:9092
security.inter.broker.protocol=SASL_PLAINTEXT

# SASL 配置
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
sasl.enabled.mechanisms=SCRAM-SHA-256

同时需要在 broker 的 JAAS 配置文件中配置 inter-broker 通信使用的凭证:

1
2
3
4
5
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};

第三步,配置 producer 客户端。在 producer.properties 中:

1
2
3
4
5
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="producer-user" \
password="producer-secret";

第四步,配置 consumer 客户端,方式相同,替换用户名和密码即可。

授权:ACL

Kafka 的授权基于 ACL(Access Control List)。每条 ACL 规则描述了一个 principal(用户)对一个 resource(资源)执行某个 operation(操作)的许可或拒绝。

资源类型包括:

资源类型 示例 说明
Topic my-topic 某个具体 topic
Group my-consumer-group consumer group ID
Cluster kafka-cluster 集群级操作
TransactionalId my-tx-id 事务 ID
DelegationToken token-id 委托令牌

操作类型包括 Read、Write、Create、Delete、Alter、Describe、ClusterAction、IdempotentWrite 等。

ACL 管理使用 kafka-acls.sh 工具:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 允许 producer-user 向 topic my-topic 写入
kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:producer-user \
--operation Write --operation Describe \
--topic my-topic

# 允许 consumer-user 从 topic my-topic 读取,并使用 consumer group my-group
kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:consumer-user \
--operation Read --operation Describe \
--topic my-topic

kafka-acls.sh --bootstrap-server localhost:9092 \
--add --allow-principal User:consumer-user \
--operation Read \
--group my-group

# 查看所有 ACL
kafka-acls.sh --bootstrap-server localhost:9092 --list

Kafka 的 ACL 默认行为是"没有显式允许就拒绝"。可以通过 allow.everyone.if.no.acl.found=true 改变这个默认值,但生产环境不推荐。

broker 端需要启用 authorizer:

1
2
3
4
5
# ZooKeeper 模式
authorizer.class.name=kafka.security.authorizer.AclAuthorizer

# KRaft 模式
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

super.users 配置指定超级用户,超级用户绕过所有 ACL 检查:

1
super.users=User:admin

Per-Listener 安全配置

Kafka 支持在不同 listener 上配置不同的安全协议。一个 broker 可以同时暴露多个端口,每个端口使用不同的安全级别:

1
2
listeners=PLAINTEXT://0.0.0.0:9092,SASL_SSL://0.0.0.0:9093,SSL://0.0.0.0:9094
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SASL_SSL:SASL_SSL,SSL:SSL

典型的多端口配置场景:

1
2
3
Port 9092 (PLAINTEXT)   → 内网监控/运维工具(受信任网络)
Port 9093 (SASL_SSL) → 应用程序客户端(需要认证 + 加密)
Port 9094 (SSL) → 跨数据中心 broker 间通信(mTLS)

inter.broker.listener.name 指定 broker 之间通信使用哪个 listener:

1
inter.broker.listener.name=SSL

加密:TLS 配置

TLS 加密需要配置 keystore(存放 broker 自己的证书和私钥)和 truststore(存放信任的 CA 证书)。

生成证书的典型流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 为 broker 生成密钥对和自签名证书
keytool -keystore broker.keystore.jks -alias broker1 \
-keyalg RSA -genkey -storepass changeit \
-dname "CN=broker1,OU=kafka,O=company"

# 导出证书
keytool -keystore broker.keystore.jks -alias broker1 \
-certreq -file broker1.csr -storepass changeit

# 用 CA 签名(假设已有 CA)
openssl x509 -req -in broker1.csr -CA ca-cert \
-CAkey ca-key -CAcreateserial \
-out broker1-signed.pem -days 365

# 导入 CA 证书和签名后的证书到 keystore
keytool -keystore broker.keystore.jks -alias CARoot \
-importcert -file ca-cert -storepass changeit
keytool -keystore broker.keystore.jks -alias broker1 \
-importcert -file broker1-signed.pem -storepass changeit

# 创建 truststore 并导入 CA 证书
keytool -keystore broker.truststore.jks -alias CARoot \
-importcert -file ca-cert -storepass changeit

broker 端 TLS 配置:

1
2
3
4
5
6
ssl.keystore.location=/path/to/broker.keystore.jks
ssl.keystore.password=changeit
ssl.key.password=changeit
ssl.truststore.location=/path/to/broker.truststore.jks
ssl.truststore.password=changeit
ssl.client.auth=required # mTLS: 要求客户端提供证书

Delegation Token

KIP-48 引入了 delegation token 机制,用于解决分布式计算框架(Spark、Flink)的认证问题。

这些框架的 driver 进程拥有 Kerberos 凭证,但 executor 进程运行在集群的各个节点上,让每个 executor 都持有 Kerberos keytab 不现实。Delegation token 的工作方式是:driver 用自己的凭证向 broker 申请一个短期 token,然后把这个 token 分发给所有 executor,executor 用 token 进行认证。

1
2
3
4
5
# 创建 delegation token
kafka-delegation-tokens.sh --bootstrap-server localhost:9093 \
--create --max-life-time-period -1 \
--command-config client.properties \
--renewer-principal User:renewer-user

token 有有效期,过期后需要续约或重新创建。

实验:SASL/SCRAM 认证验证

以下实验在一个启用了 SASL/SCRAM 的单 broker 环境中验证认证和授权。

第一步,启动配置了 SASL_PLAINTEXT 的 broker。

第二步,用已授权的 producer 发送消息:

1
2
3
4
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic secure-topic \
--producer.config producer-scram.properties

producer-scram.properties 内容:

1
2
3
4
5
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="producer-user" \
password="producer-secret";

第三步,用未授权的用户尝试发送:

1
2
3
4
kafka-console-producer.sh \
--bootstrap-server localhost:9092 \
--topic secure-topic \
--producer.config unauthorized-user.properties

预期结果:收到 TopicAuthorizationException,消息发送失败。

第四步,用已授权的 consumer 消费:

1
2
3
4
5
6
kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic secure-topic \
--from-beginning \
--consumer.config consumer-scram.properties \
--group my-group

如果 consumer-user 没有被授权 Read my-group,同样会收到 GroupAuthorizationException。

模式提炼

Kafka 的安全模型是分层安全(defense in depth)的典型实现。这个模式在各种系统中反复出现:

数据库安全:用户名/密码认证 → GRANT/REVOKE 权限控制 → SSL/TLS 传输加密。MySQL 的 mysql.user 表对应 Kafka 的 SCRAM 用户存储,GRANT 语句对应 kafka-acls.sh

Kubernetes RBAC:ServiceAccount(认证) → Role/ClusterRole + RoleBinding(授权) → TLS(加密)。Kubernetes 的 Role 按 namespace 隔离资源,Kafka 的 ACL 按 topic/group 隔离资源。

AWS IAM:IAM User/Role(认证) → Policy(授权) → HTTPS(加密)。IAM Policy 的 Effect/Action/Resource 结构与 Kafka ACL 的 Allow-Deny/Operation/Resource 结构同构。

共同点是三个层次各自独立可配置,且层与层之间没有依赖——可以只开加密不开认证,也可以只开认证不开授权。

工程迁移表

概念 Kafka RocketMQ 数据库 Kubernetes
认证方式 SASL/SCRAM, mTLS, Kerberos AK/SK, mTLS 用户名/密码, LDAP ServiceAccount, OIDC
授权模型 ACL (principal + resource + operation) ACL (topic + group + IP) GRANT/REVOKE RBAC (Role + RoleBinding)
资源粒度 topic, group, cluster, txn-id topic, group database, table, column namespace, pod, service
加密 TLS (client-broker, broker-broker) TLS (client-broker) TLS (client-server) mTLS (pod-to-pod via Istio)
短期凭证 Delegation Token (KIP-48) STS Token 临时密码 Projected Service Account Token
超级用户 super.users 配置 无内置概念 root / DBA 角色 cluster-admin ClusterRole

常见误解

PLAINTEXT 在生产环境没问题:PLAINTEXT 协议意味着所有数据(包括消息内容和认证凭证)以明文在网络上传输。即使在内网环境中,网络嗅探、ARP 欺骗等攻击手段仍然可能暴露敏感数据。生产环境至少应该启用 TLS 加密。

SSL/TLS 会让吞吐量减半:TLS 确实增加了 CPU 开销(加解密运算)和网络延迟(握手),但在现代硬件上,启用 TLS 的吞吐量下降通常在 10-20% 范围内,而不是 50%。使用 TLS 1.3(减少握手往返)和硬件 AES 加速指令(AES-NI)可以进一步降低开销。

ACL 只能按 topic 控制:Kafka ACL 的资源类型包括 Topic、Group、Cluster、TransactionalId 和 DelegationToken 五种。可以精确控制到某个用户对某个 consumer group 的 Read 权限,或某个用户使用某个 transactional.id 发起事务的权限。

练习

  1. 在一个 KRaft 模式的单 broker 集群上配置 SASL/SCRAM-SHA-256 认证。创建两个用户 producer-user 和 consumer-user,验证未认证的客户端无法连接。

  2. 为 producer-user 配置 ACL,只允许向 topic orders 写入。尝试让 producer-user 向 topic payments 写入,观察错误信息。

  3. 在同一个 broker 上配置两个 listener:9092 端口使用 PLAINTEXT(仅限内网运维),9093 端口使用 SASL_SSL(面向应用客户端)。验证两个端口都能正常工作。

  4. 测量启用 TLS 前后 producer 的吞吐量差异。使用 kafka-producer-perf-test 发送 100 万条消息,对比 PLAINTEXT 和 SSL 协议下的 records/sec。

系列导航

序号 主题
00 导读:为什么 Kafka 的核心是一根日志
01 架构总览:Broker、Controller 与元数据管理
02 日志存储:Segment、Index 与零拷贝
03 Producer 内部机制:攒批、分区与 acks
04 幂等 Producer 与序列号:消息不重不丢的第一层
05 Consumer Group 协议:分配、重平衡与静态成员
06 Offset 管理:提交、重置与消费语义
07 副本与 ISR:高可用的代价和折中
08 Controller 与 KRaft:从 ZooKeeper 到内置共识
09 Exactly-Once 与事务:跨 partition 的原子写入
10 日志压缩:把 topic 当 KV 表用
11 Kafka Streams:在日志之上构建流处理
12 Kafka Connect:标准化的数据管道
13 Schema Registry 与数据治理:给消息加上契约
14 性能模型:吞吐、延迟与调优思路
15 安全体系:认证、授权与加密
16 生产运维:集群扩缩、监控指标与故障排查
17 Kafka 与 RocketMQ:两种消息系统的设计选择

参考资料