kafka相关证书配置

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka相关证书配置相关的知识,希望对你有一定的参考价值。

参考技术A 参考文档:

配置参考文档: https://www.orchome.com/171

https://blog.csdn.net/difffate/article/details/53570344

https://discuss.elastic.co/t/tls-for-filebeat-kafka-output/58756

https://discuss.elastic.co/t/tls-for-filebeat-kafka-output/58756/5

文档配置:

https://blog.csdn.net/qq_41926119/article/details/104510481

kafka官方文档:

https://www.ibm.com/support/knowledgecenter/en/SSZU2E_2.3.0/managing_cluster/ssl_elastic_stack_enable.html

1.  生成证书:

2.基于jks生成pem证书:

3. 查看pem证书:

多个pem证书合并:

4. 查看jks证书:

keytool -list -v -keystore  server.truststore.jks

5.生成filebeat证书: (需要把ca.pem 加入到 kafka的truststore.jks中,完成服务端授信)

通过ES生成自签的CA证书:./bin/elasticsearch-certutil ca

生成颁发的证书:./bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12 --name filebeat  --pem --out filebeat2.zip

keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert

6.  kafka的最终配置

listeners=PLAINTEXT://172.28.15.231:9092,SSL://172.28.15.231:9093

ssl.endpoint.identification.algorithm=

ssl.keystore.location=/data/xxx/kafka_ca_cert/kafka.server.keystore.jks

ssl.keystore.password=xxxx

ssl.truststore.location=/data/xxx/kafka_ca_cert/kafka.server.truststore.jks

ssl.keystore.type=JKS

ssl.truststore.type=JKS

ssl.secure.random.implementation=SHA1PRNG

7. 客户端配置:client-ssl.properties

security.protocol=SSL

ssl.truststore.location=/data/xxx/kafka_ca_cert/kafka.client.truststore.jks

ssl.truststore.password=xxxx

ssl.keystore.type=JKS

ssl.truststore.type=JKS

ssl.endpoint.identification.algorithm=

8. kafka 客户端生产:

kafka-console-producer.sh --topic test-log  --broker-list xxxx:9093 --producer.config client-ssl.properties

kafka-console-producer.sh --topic test-log  --broker-list xxxx:9093 --producer.config client-ssl.properties

9. kafka 客户端消费:

kafka-console-consumer.sh --topic test-log --bootstrap-server xxxx:9093 --consumer.config client-ssl.properties

最终filebeat配置:

filebeat 的一些配置的官方文档:

https://www.elastic.co/guide/en/beats/filebeat/master/kafka-output.html

https://www.elastic.co/guide/en/beats/filebeat/master/configuration-ssl.html

https://blog.csdn.net/qq_41926119/article/details/104510481

EFK接入kafka消息队列:

https://www.ucloud.cn/yun/34441.html

需要注意的是,filebeat配置的是pem证书,kafka和logstash的kafka-input插件用的是jks证书~~~因此,证书生成工具最好需要能够同时生成这两种证书。

常见错误集锦:

https://discuss.elastic.co/t/tls-for-filebeat-kafka-output/58756/11

其他配置:

Filebeat与Logstash配置SSL加密通信:

http://www.manongjc.com/detail/14-jogxwlmgmlwauxs.html

https://www.cnblogs.com/sanduzxcvbnm/p/12055038.html

https://ngx.hk/2017/01/12/%E4%B8%BAfilebeat%E9%85%8D%E7%BD%AE%E6%95%B0%E5%AD%97%E8%AF%81%E4%B9%A6%E4%BB%A5%E4%BD%BF%E7%94%A8ssl%E5%8A%A0%E5%AF%86.html

filebeat常用配置项: https://www.elastic.co/guide/en/beats/filebeat/7.x/kafka-output.html

SSL配置项: https://www.elastic.co/guide/en/beats/filebeat/7.16/configuration-ssl.html#client-verification-mode

Kafka配置SSL认证

目录

Kafka配置SSL认证

使用kafka自带的消费者生产者测试一下


我是一只小小小小鸟~嗷!嗷!


Kafka配置SSL认证

准备工作:生成SSL相关证书

注:详见https://blog.csdn.net/justry_deng/article/details/88383081

注:其实对于本节内容来说,有SSL的服务端证书就够了。

第一步:修改kafka安装目录下config目录下的server.properties文件

############################# Server Basics #############################
# SSL认证配置
# 如果配置了SSL认证,那么原来的port和advertised.listeners可以注释掉了
listeners=SSL://kafka-single:9095
advertised.listeners=SSL://kafka-single:9095
ssl.keystore.location=/usr/ca/server/server.keystore.jks
ssl.keystore.password=ds1994
ssl.key.password=ds1994
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=ds1994
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS 
ssl.truststore.type=JKS 
# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了HTTPS,即:需要验证主机名
# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可
ssl.endpoint.identification.algorithm=HTTPS
# 设置内部访问也用SSL,默认值为security.inter.broker.protocol=PLAINTEXT
security.inter.broker.protocol=SSL
broker.id=0
############################# Socket Server Settings #############################
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/usr/data/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0

注:SSL配置最好写在配置文件的最上面,否者可能导致Kafka配置SSL失败。

注:如果设置的内部broker的通讯协议PLAINTEXT,那么监听PLAINTEXT的时候就需要作相应的配置
       listeners=PLAINTEXT://host.name:port,SSL://host.name:port。

第二步(可选):如果配置SSL之前,存在Kafka数据,那么建议重新换一个位置来
                        存放数据;如果确保之前的数据已经没什么用了,也可以直接删除
                        之前的数据

注:之所以是这个文件夹,是因为是我们在安装kafka的时候指定的数据目录,可
        详见https://blog.csdn.net/justry_deng/article/details/88381595

第三步:重启kafka

# 后台启动zookeeper
/var/local/kafka/bin/zookeeper-server-start.sh /var/local/kafka/config/zookeeper.properties > /usr/data/zookeeper.log 2>&1 &
# 前台启动kafak
/var/local/kafka/bin/kafka-server-start.sh /var/local/kafka/config/server.properties

注:启动kafka时最好使用进程独占一个shell的方式前台启动,这样能非常直观的查看启动kafka是否成功。

第四步:使用Linux自带的openssl测试一下,验证我们配置的ssl有效

openssl s_client -debug -connect kafka-single:9095 -tls1

注:kafka-single为/etc/hosts文件中配置的host名。

弹出以下内容,说明成功了:

                          ……

                          ……

到此为止,就说明初步成功了!


使用kafka自带的消费者生产者测试一下

准备工作:先创建一个主题

# 创建主题
/var/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topicOne
# 查看所有主题
/var/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

执行效果如图:

消费者

  • 首先创建一个SSL下的消费者配置文件c.properties,编辑其内容为:
security.protocol=SSL
group.id=test-group
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=ds1994
ssl.keystore.password=ds1994
ssl.keystore.location=/usr/ca/server/server.keystore.jks
  • 启动消费者:
/var/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-single:9095 --topic topicOne --from-beginning --consumer.config /var/local/kafka/config/c.properties

生产者

首先创建一个SSL下的消费者配置文件p.properties,编辑其内容为:

bootstrap.servers=kafka-single:9095
security.protocol=SSL
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=ds1994   
ssl.keystore.password=ds1994
ssl.keystore.location=/usr/ca/server/server.keystore.jks

启动生产者:

/var/local/kafka/bin/kafka-console-producer.sh --broker-list kafka-single:9095 --topic topicOne --producer.config /var/local/kafka/config/p.properties

结果观察

在生产者里面发送消息:

在消费者里面消费到了消息:

由此可见完全成功了!

 

声明:本文为学习笔记,学习自51CTO,《Kafka消息中间件》,讲师李兴华

^_^ 如有不当之处,欢迎指正

^_^ 学习视频:
              《Kafka消息中间件》,讲师李兴华

^_^ 参考链接:
              http://kafka.apache.org/documentation/

^_^ 本文已经被收录进《程序员成长笔记(四)》,笔者JustryDeng

以上是关于kafka相关证书配置的主要内容,如果未能解决你的问题,请参考以下文章

Kafka配置SSL认证

iOS 打包上传AppStore相关-相关证书配置

如何配置tomcat证书

exchange2016 4节点完整安装之证书配置

nginx配置https证书

服务器如何配置ssl证书