kafka部署SASL_PLAINTEXT和TLS并存如何部署,给我详细步骤
时间: 2025-08-11 12:10:19 浏览: 14
<think>我们面对的问题是在Kafka中同时配置SASL_PLAINTEXT和TLS安全协议。SASL_PLAINTEXT提供基于用户名密码的认证,而TLS(Transport Layer Security)则提供加密传输。但注意,SASL_PLAINTEXT本身是不加密的,因此通常我们会使用SASL_SSL(即SASL over SSL/TLS)来同时实现认证和加密。然而,用户要求同时部署SASL_PLAINTEXT和TLS,这里可能有两种理解:
1. 同时支持两种协议:一部分客户端使用SASL_PLAINTEXT(不加密但认证),另一部分使用TLS(加密但不一定认证,通常TLS也可以做客户端认证,但这里可能指仅加密)。
2. 同时使用SASL认证和TLS加密,即SASL_SSL。
从用户描述“同时部署SASL_PLAINTEXT和TLS的安全认证机制”来看,更可能是希望同时支持两种协议。但是,在Kafka中,一个监听器可以配置多种安全协议,但通常一个监听器只能使用一种安全协议。我们可以通过配置多个监听器来实现同时支持SASL_PLAINTEXT和SSL(TLS)协议。
然而,需要注意的是,SASL_PLAINTEXT是不加密的,而SSL是加密的。如果我们要同时提供,那么就需要两个监听器,一个使用SASL_PLAINTEXT,另一个使用SSL(或者SASL_SSL,即同时使用SASL和SSL)。
但是,用户要求同时部署SASL_PLAINTEXT和TLS,并没有要求SASL_SSL。因此,我们可以这样配置:
- 监听器1:使用SASL_PLAINTEXT,提供基于SASL的认证,但传输不加密。
- 监听器2:使用SSL,提供TLS加密传输,但可能不需要SASL认证(或者使用TLS客户端认证,即双向认证)。
不过,更常见的场景是:
- 监听器1:SASL_PLAINTEXT(仅用于内网或测试,不加密)
- 监听器2:SASL_SSL(既认证又加密)
但用户没有明确要求SSL监听器是否要认证,所以这里我们按照两种独立的协议来配置。
下面,我们将提供详细的步骤来配置Kafka,使其同时支持SASL_PLAINTEXT(使用SCRAM机制)和SSL(TLS加密)两种安全协议。我们将分为以下几个部分:
1. 生成SSL证书(用于TLS加密)
2. 配置Kafka的server.properties,设置两个监听器:一个使用SASL_PLAINTEXT,另一个使用SSL。
3. 配置SASL(SCRAM)用户认证。
4. 配置ZooKeeper的SASL认证(如果使用ZooKeeper)。
5. 启动和测试。
注意:本示例假设使用Kafka 3.6.0版本,使用Docker环境部署(方便演示),但配置同样适用于非Docker环境。
### 步骤1:生成SSL证书
首先,我们需要为Kafka broker生成SSL证书。可以使用keytool(Java自带)来生成。
创建keystore(用于broker的私钥和证书):
```bash
keytool -keystore kafka.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA
```
按照提示输入信息,注意在“您的名字与姓氏是什么?”时,输入broker的主机名(如果是本地测试,可以用localhost,但在生产环境必须用主机名或IP,且客户端需要能够解析)。
生成自签名CA(用于签名证书,这里自签名用于测试):
```bash
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
```
然后,将CA证书导入到broker的truststore(信任库):
```bash
keytool -keystore kafka.truststore.jks -alias CARoot -import -file ca-cert
```
同时,将CA证书导入到客户端的truststore(以便客户端信任broker)。
为broker证书签名:
首先,导出keystore中的证书请求:
```bash
keytool -keystore kafka.keystore.jks -alias localhost -certreq -file cert-file
```
然后用CA签名:
```bash
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial
```
最后,将CA证书和签名后的证书导入keystore:
```bash
keytool -keystore kafka.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.keystore.jks -alias localhost -import -file cert-signed
```
### 步骤2:配置Kafka broker
我们需要配置server.properties文件,设置两个监听器:一个使用SASL_PLAINTEXT,另一个使用SSL。
假设我们有两个监听器:
- SASL_PLAINTEXT://:9092
- SSL://:9093
同时,我们需要配置advertised.listeners,使得客户端能够正确连接到broker。这里假设broker的IP为192.168.1.100(请替换为实际IP)。
配置如下:
```properties
# server.properties
listeners=SASL_PLAINTEXT://:9092,SSL://:9093
advertised.listeners=SASL_PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093
# 安全协议映射(告诉Kafka每个监听器使用的安全协议)
listener.security.protocol.map=SASL_PLAINTEXT:SASL_PLAINTEXT,SSL:SSL
# 设置SASL认证的监听器
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
# 配置SSL
ssl.keystore.location=/path/to/kafka.keystore.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
ssl.truststore.location=/path/to/kafka.truststore.jks
ssl.truststore.password=truststore_password
ssl.client.auth=required # 要求客户端提供证书(双向认证)或者可以设置为none或requested(根据需求)
# 注意:如果不需要客户端证书认证,可以设置为none,但这样SSL只提供加密,不提供客户端认证。
# 配置SASL JAAS
# 创建JAAS配置文件,比如kafka_server_jaas.conf
# 然后在启动Kafka时指定JVM参数:-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf
```
### 步骤3:配置SASL(SCRAM)用户
创建JAAS配置文件(kafka_server_jaas.conf):
```properties
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret";
};
```
这里我们定义了两个用户:admin和alice,以及他们的密码。
然后,在server.properties中指定JAAS配置(也可以通过环境变量KAFKA_OPTS):
```bash
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf"
```
此外,我们还需要创建这些用户。Kafka提供了kafka-configs.sh脚本来管理用户。
首先,启动ZooKeeper(如果使用)和Kafka broker。然后,创建SCRAM用户:
```bash
# 创建admin用户
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-512=[password=admin-secret]' --entity-type users --entity-name admin
# 创建alice用户
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-512=[password=alice-secret]' --entity-type users --entity-name alice
```
### 步骤4:配置ZooKeeper的SASL认证(如果使用)
如果Kafka集群使用ZooKeeper,并且需要SASL认证,那么还需要配置ZooKeeper的JAAS。这里不再展开,可参考之前的引用[^2]。
### 步骤5:启动和测试
启动ZooKeeper和Kafka broker。
测试SASL_PLAINTEXT监听器(使用SCRAM认证):
```bash
# 创建客户端配置文件(consumer.properties或producer.properties)
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="alice" password="alice-secret";
# 然后使用kafka-console-producer.sh和kafka-console-consumer.sh测试
bin/kafka-console-producer.sh --broker-list 192.168.1.100:9092 --topic test --producer.config client.properties
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092 --topic test --consumer.config client.properties
```
测试SSL监听器(使用TLS加密和客户端证书认证):
首先,我们需要为客户端生成证书并签名(类似于broker的步骤),然后配置客户端属性文件:
```properties
security.protocol=SSL
ssl.truststore.location=/path/to/client.truststore.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=/path/to/client.keystore.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
```
然后使用该配置文件进行生产消费测试。
### 注意事项
1. 如果不需要SSL监听器的客户端证书认证,可以在server.properties中设置`ssl.client.auth=none`。
2. 在生产环境中,建议使用SASL_SSL(即一个监听器同时提供SASL认证和TLS加密)而不是分开两个监听器,因为分开的话SASL_PLAINTEXT是不安全的。
3. 如果使用Docker部署,需要将证书文件和JAAS配置文件挂载到容器内,并设置相应的环境变量(如KAFKA_OPTS)和配置。
### 参考配置(Docker Compose示例)
如果使用Docker,可以编写docker-compose.yml文件,配置Kafka服务。这里以bitnami/kafka镜像为例(版本3.6.0):
```yaml
version: '3'
services:
zookeeper:
image: bitnami/zookeeper:3.8
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
ports:
- "2181:2181"
kafka:
image: bitnami/kafka:3.6
ports:
- "9092:9092"
- "9093:9093"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://192.168.1.100:9092,SSL://192.168.1.100:9093
- KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,SSL://:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=SASL_PLAINTEXT:SASL_PLAINTEXT,SSL:SSL
- KAFKA_CFG_SASL_ENABLED_MECHANISMS=SCRAM-SHA-256,SCRAM-SHA-512
- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=SCRAM-SHA-512
- KAFKA_OPTS=-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_server_jaas.conf
- KAFKA_CFG_SSL_KEYSTORE_LOCATION=/opt/bitnami/kafka/certs/kafka.keystore.jks
- KAFKA_CFG_SSL_KEYSTORE_PASSWORD=keystore_password
- KAFKA_CFG_SSL_KEY_PASSWORD=key_password
- KAFKA_CFG_SSL_TRUSTSTORE_LOCATION=/opt/bitnami/kafka/certs/kafka.truststore.jks
- KAFKA_CFG_SSL_TRUSTSTORE_PASSWORD=truststore_password
- KAFKA_CFG_SSL_CLIENT_AUTH=required
volumes:
- ./kafka_server_jaas.conf:/opt/bitnami/kafka/config/kafka_server_jaas.conf
- ./certs:/opt/bitnami/kafka/certs
```
以上配置中,需要提前将生成的keystore、truststore以及JAAS配置文件放在相应的目录下。
### 总结
通过以上步骤,我们实现了在Kafka中同时部署SASL_PLAINTEXT(提供SASL认证但不加密)和SSL(提供TLS加密和客户端证书认证)两种安全协议。用户可以根据需要选择使用哪种协议连接Kafka。
§§相关问题§§
1. 如何为Kafka配置SASL_SSL(同时使用SASL认证和TLS加密)?
2. Kafka中SCRAM认证机制的工作原理是什么?
3. 如何配置Kafka客户端使用双向SSL认证?</think>### Kafka同时配置SASL_PLAINTEXT和TLS的详细步骤
#### 1. **环境准备**
- Kafka版本:3.6.0+
- Java 8+
- 操作系统:Linux/WSL/Mac
- 证书工具:`keytool`(Java自带)
- 配置文件:`server.properties`
#### 2. **生成TLS证书**
```bash
# 生成CA证书
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
# 创建Broker密钥库
keytool -keystore kafka.keystore.jks -alias kafka -validity 365 -genkey
keytool -keystore kafka.keystore.jks -alias kafka -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial
keytool -keystore kafka.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.keystore.jks -alias kafka -import -file cert-signed
# 创建信任库(所有Broker和客户端共用)
keytool -keystore kafka.truststore.jks -alias CARoot -import -file ca-cert
```
#### 3. **配置Kafka Broker**
修改`server.properties`:
```properties
# 监听器配置
listeners=SASL_PLAINTEXT://:9092,SSL://:9093
advertised.listeners=SASL_PLAINTEXT://your-host:9092,SSL://your-host:9093
# SASL配置
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
# SSL配置
ssl.keystore.location=/path/to/kafka.keystore.jks
ssl.keystore.password=your_keystore_pass
ssl.key.password=your_key_pass
ssl.truststore.location=/path/to/kafka.truststore.jks
ssl.truststore.password=your_truststore_pass
ssl.client.auth=required # 启用双向认证
```
#### 4. **配置SASL JAAS文件**
创建`kafka_server_jaas.conf`:
```properties
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="kafka_admin"
password="kafka_password"
user_kafka_admin="kafka_password"
user_client1="client1_pass";
};
```
启动时加载JAAS:
```bash
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf"
```
#### 5. **创建SCRAM用户**
```bash
# 创建SASL用户
bin/kafka-configs.sh --bootstrap-server localhost:9093 --command-config ssl-config.properties \
--alter --add-config 'SCRAM-SHA-512=[password=client1_pass]' \
--entity-type users --entity-name client1
```
#### 6. **配置ZooKeeper SASL**
创建`zookeeper_server_jaas.conf`:
```properties
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_super="super_pass";
};
```
在`zoo.cfg`中添加:
```properties
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
```
#### 7. **客户端配置示例**
**SASL_PLAINTEXT客户端配置**:
```properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="client1" password="client1_pass";
```
**TLS客户端配置**:
```properties
security.protocol=SSL
ssl.truststore.location=client.truststore.jks
ssl.truststore.password=truststore_pass
ssl.keystore.location=client.keystore.jks
ssl.keystore.password=keystore_pass
ssl.key.password=key_pass
```
#### 8. **Docker部署示例
```yaml
version: '3'
services:
kafka:
image: bitnami/kafka:3.6.0
environment:
- KAFKA_CFG_LISTENERS=SASL_PLAINTEXT://:9092,SSL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=SASL_PLAINTEXT://kafka:9092,SSL://kafka:9093
- KAFKA_CFG_SASL_ENABLED_MECHANISMS=SCRAM-SHA-512
- KAFKA_CFG_SSL_TRUSTSTORE_PASSWORD=kafka2024
- KAFKA_OPTS=-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka.jaas
volumes:
- ./kafka.jaas:/opt/bitnami/kafka/config/kafka.jaas
- ./certs:/opt/bitnami/kafka/certs
```
#### 9. **验证配置**
```bash
# 测试SASL_PLAINTEXT
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic test --producer.config sasl-config.properties
# 测试TLS
kafka-console-consumer.sh --bootstrap-server localhost:9093 \
--topic test --consumer.config ssl-config.properties
```
> **关键注意事项**:
> 1. SASL_PLAINTEXT仅用于认证**不加密数据**,适用于安全内网
> 2. TLS提供加密但需证书管理开销
> 3. 生产环境推荐使用SASL_SSL(认证+加密一体化)
> 4. ZooKeeper 3.6+需同步配置SASL[^2]
阅读全文
相关推荐




















