kafka相关证书配置
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka相关证书配置相关的知识,希望对你有一定的参考价值。
参考技术A 参考文档:配置参考文档: https://www.orchome.com/171
https://blog.csdn.net/difffate/article/details/53570344
https://discuss.elastic.co/t/tls-for-filebeat-kafka-output/58756
https://discuss.elastic.co/t/tls-for-filebeat-kafka-output/58756/5
文档配置:
https://blog.csdn.net/qq_41926119/article/details/104510481
kafka官方文档:
https://www.ibm.com/support/knowledgecenter/en/SSZU2E_2.3.0/managing_cluster/ssl_elastic_stack_enable.html
1. 生成证书:
2.基于jks生成pem证书:
3. 查看pem证书:
多个pem证书合并:
4. 查看jks证书:
keytool -list -v -keystore server.truststore.jks
5.生成filebeat证书: (需要把ca.pem 加入到 kafka的truststore.jks中,完成服务端授信)
通过ES生成自签的CA证书:./bin/elasticsearch-certutil ca
生成颁发的证书:./bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12 --name filebeat --pem --out filebeat2.zip
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
6. kafka的最终配置
listeners=PLAINTEXT://172.28.15.231:9092,SSL://172.28.15.231:9093
ssl.endpoint.identification.algorithm=
ssl.keystore.location=/data/xxx/kafka_ca_cert/kafka.server.keystore.jks
ssl.keystore.password=xxxx
ssl.truststore.location=/data/xxx/kafka_ca_cert/kafka.server.truststore.jks
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.secure.random.implementation=SHA1PRNG
7. 客户端配置:client-ssl.properties
security.protocol=SSL
ssl.truststore.location=/data/xxx/kafka_ca_cert/kafka.client.truststore.jks
ssl.truststore.password=xxxx
ssl.keystore.type=JKS
ssl.truststore.type=JKS
ssl.endpoint.identification.algorithm=
8. kafka 客户端生产:
kafka-console-producer.sh --topic test-log --broker-list xxxx:9093 --producer.config client-ssl.properties
kafka-console-producer.sh --topic test-log --broker-list xxxx:9093 --producer.config client-ssl.properties
9. kafka 客户端消费:
kafka-console-consumer.sh --topic test-log --bootstrap-server xxxx:9093 --consumer.config client-ssl.properties
最终filebeat配置:
filebeat 的一些配置的官方文档:
https://www.elastic.co/guide/en/beats/filebeat/master/kafka-output.html
https://www.elastic.co/guide/en/beats/filebeat/master/configuration-ssl.html
https://blog.csdn.net/qq_41926119/article/details/104510481
EFK接入kafka消息队列:
https://www.ucloud.cn/yun/34441.html
需要注意的是,filebeat配置的是pem证书,kafka和logstash的kafka-input插件用的是jks证书~~~因此,证书生成工具最好需要能够同时生成这两种证书。
常见错误集锦:
https://discuss.elastic.co/t/tls-for-filebeat-kafka-output/58756/11
其他配置:
Filebeat与Logstash配置SSL加密通信:
http://www.manongjc.com/detail/14-jogxwlmgmlwauxs.html
https://www.cnblogs.com/sanduzxcvbnm/p/12055038.html
https://ngx.hk/2017/01/12/%E4%B8%BAfilebeat%E9%85%8D%E7%BD%AE%E6%95%B0%E5%AD%97%E8%AF%81%E4%B9%A6%E4%BB%A5%E4%BD%BF%E7%94%A8ssl%E5%8A%A0%E5%AF%86.html
filebeat常用配置项: https://www.elastic.co/guide/en/beats/filebeat/7.x/kafka-output.html
SSL配置项: https://www.elastic.co/guide/en/beats/filebeat/7.16/configuration-ssl.html#client-verification-mode
Kafka配置SSL认证
目录
我是一只小小小小鸟~嗷!嗷!
Kafka配置SSL认证
准备工作:生成SSL相关证书
注:详见https://blog.csdn.net/justry_deng/article/details/88383081。
注:其实对于本节内容来说,有SSL的服务端证书就够了。
第一步:修改kafka安装目录下config目录下的server.properties文件
############################# Server Basics #############################
# SSL认证配置
# 如果配置了SSL认证,那么原来的port和advertised.listeners可以注释掉了
listeners=SSL://kafka-single:9095
advertised.listeners=SSL://kafka-single:9095
ssl.keystore.location=/usr/ca/server/server.keystore.jks
ssl.keystore.password=ds1994
ssl.key.password=ds1994
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=ds1994
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了HTTPS,即:需要验证主机名
# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可
ssl.endpoint.identification.algorithm=HTTPS
# 设置内部访问也用SSL,默认值为security.inter.broker.protocol=PLAINTEXT
security.inter.broker.protocol=SSL
broker.id=0
############################# Socket Server Settings #############################
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/usr/data/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
注:SSL配置最好写在配置文件的最上面,否者可能导致Kafka配置SSL失败。
注:如果设置的内部broker的通讯协议PLAINTEXT,那么监听PLAINTEXT的时候就需要作相应的配置
listeners=PLAINTEXT://host.name:port,SSL://host.name:port。
第二步(可选):如果配置SSL之前,存在Kafka数据,那么建议重新换一个位置来
存放数据;如果确保之前的数据已经没什么用了,也可以直接删除
之前的数据
注:之所以是这个文件夹,是因为是我们在安装kafka的时候指定的数据目录,可
详见https://blog.csdn.net/justry_deng/article/details/88381595。
第三步:重启kafka
# 后台启动zookeeper
/var/local/kafka/bin/zookeeper-server-start.sh /var/local/kafka/config/zookeeper.properties > /usr/data/zookeeper.log 2>&1 &
# 前台启动kafak
/var/local/kafka/bin/kafka-server-start.sh /var/local/kafka/config/server.properties
注:启动kafka时最好使用进程独占一个shell的方式前台启动,这样能非常直观的查看启动kafka是否成功。
第四步:使用Linux自带的openssl测试一下,验证我们配置的ssl有效
openssl s_client -debug -connect kafka-single:9095 -tls1
注:kafka-single为/etc/hosts文件中配置的host名。
弹出以下内容,说明成功了:
……
……
到此为止,就说明初步成功了!
使用kafka自带的消费者生产者测试一下
准备工作:先创建一个主题
# 创建主题
/var/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topicOne
# 查看所有主题
/var/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
执行效果如图:
消费者:
- 首先创建一个SSL下的消费者配置文件c.properties,编辑其内容为:
security.protocol=SSL
group.id=test-group
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=ds1994
ssl.keystore.password=ds1994
ssl.keystore.location=/usr/ca/server/server.keystore.jks
- 启动消费者:
/var/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-single:9095 --topic topicOne --from-beginning --consumer.config /var/local/kafka/config/c.properties
生产者:
首先创建一个SSL下的消费者配置文件p.properties,编辑其内容为:
bootstrap.servers=kafka-single:9095
security.protocol=SSL
ssl.truststore.location=/usr/ca/trust/server.truststore.jks
ssl.truststore.password=ds1994
ssl.keystore.password=ds1994
ssl.keystore.location=/usr/ca/server/server.keystore.jks
启动生产者:
/var/local/kafka/bin/kafka-console-producer.sh --broker-list kafka-single:9095 --topic topicOne --producer.config /var/local/kafka/config/p.properties
结果观察:
在生产者里面发送消息:
在消费者里面消费到了消息:
由此可见完全成功了!
声明:本文为学习笔记,学习自51CTO,《Kafka消息中间件》,讲师李兴华。
^_^ 如有不当之处,欢迎指正
^_^ 学习视频:
《Kafka消息中间件》,讲师李兴华
^_^ 参考链接:
http://kafka.apache.org/documentation/
^_^ 本文已经被收录进《程序员成长笔记(四)》,笔者JustryDeng
以上是关于kafka相关证书配置的主要内容,如果未能解决你的问题,请参考以下文章