kafka常见的操作
Posted lqbyz
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka常见的操作相关的知识,希望对你有一定的参考价值。
1、kafka监控
1、生产环境kafka监控
MSK prod-kafka-msk-tls-01集群监控,
1、kafka_exporter安装地址为:singapore-prod-riskmonitor-logstash-09,端口9309
root@singapore-prod-riskmonitor-logstash-09,10.31.4.141:kafka_exporter # nohup /opt/kafka_exporter/kafka_exporter --kafka.server=b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --kafka.server=b-2.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --kafka.server=b-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --tls.enabled --tls.ca-file=server.cer.pem --tls.insecure-skip-tls-verify --web.listen-address=:9309 --tls.cert-file=client.cer.pem --tls.key-file=client.key.pem &
#nohup /opt/kafka_exporter/kafka_exporter \\
--kafka.server=b-3.prod-kafka1.amazonaws.com:9094 \\
--kafka.server=b-2.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 \\
--kafka.server=b-1.prod-kafk.ap-southeast-1.amazonaws.com:9094 \\
--tls.enabled \\
--tls.ca-file=server.cer.pem \\
--tls.insecure-skip-tls-verify \\
--web.listen-address=:9309 \\
--tls.cert-file=client.cer.pem \\
--tls.key-file=client.key.pem
2、prod-flash-kafka-tls监控 ,监控端口9310
nohup /opt/kafka_exporter/kafka_exporter --kafka.server=b-2.prod-flash-kafka.2tpo15.c4.kafka.ap-southeast-1.amazonaws.com:9094 --kafka.server=b-3.prod-flash-kafka.2tpo15.c4.kafka.ap-southeast-1.amazonaws.com:9094 --kafka.server=b-1.prod-flash-kafka.2tpo15.c4.kafka.ap-southeast-1.amazonaws.com:9094 --web.listen-address=:9310 --tls.cert-file=client.cer.pem --tls.key-file=client.key.pem --tls.enabled --tls.ca-file=server.cer.pem --tls.insecure-skip-tls-verify &
3、prod-msk-kafka-01 监控,监控端口默认9308
nohup /opt/kafka_exporter/kafka_exporter --kafka.server=b-1.prod-msk-kafka-01.himvak.c4.kafka.ap-southeast-1.amazonaws.com:9092 --log.level=info &
2、kafka常用的查询命令
3、TLS环境下查看kafka相关的查询
1、创建client.properties 文件
ssl.keystore.location = /opt/nfs/msk-tls-cert/kafka.client.keystore.jks
ssl.keystore.password = 684281
ssl.keystore.type = jks
security.protocol = SSL
一、查看topic相关的操作。
1、查看topic的列表(使用zookeeper和broker都可以用来查询)
使用broker进行查询
./kafka_2.13-2.6.2/bin/kafka-topics.sh --bootstrap-server b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --command-config client.properties --list
使用zookeeper进行查询
./kafka_2.13-2.6.2/bin/kafka-topics.sh --list --zookeeper z-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:2181
2、查看topic分区的相关的信息
使用broker进行查看
./kafka_2.13-2.6.2/bin/kafka-topics.sh --bootstrap-server b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --describe --command-config client.properties
使用zookeeper进行查看
./kafka_2.13-2.6.2/bin/kafka-topics.sh --zookeeper z-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:2181 --describe
3、创建topic
./kafka_2.13-2.6.2/bin/kafka-topics.sh --zookeeper z-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:2181 --create --topic test20210702-02 --partitions 1 --replication-factor 3
4、删除topic
root@singapore-prod-riskmonitor-logstash-09,10.31.4.141:kevin.li # ./kafka_2.13-2.6.2/bin/kafka-topics.sh --zookeeper z-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:2181 --delete --topic test20210702-02
二、消费组的相关操作
1、查看消费组信息的列表
./kafka_2.13-2.6.2/bin/kafka-consumer-groups.sh --bootstrap-server b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --list --command-config client.properties
2、查看消费组lag信息
./kafka_2.13-2.6.2/bin/kafka-consumer-groups.sh --bootstrap-server b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --describe --command-config client.properties --group test-20210630
3、删除消费组
./kafka_2.13-2.6.2/bin/kafka-consumer-groups.sh --bootstrap-server b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --delete --group test-20210630 --command-config client.properties
4、重置消费组的offset点
./kafka_2.13-2.6.0/bin/kafka-consumer-groups.sh --bootstrap-server b-1.prod-public-kafka.jhoc3y.c4.kafka.ap-southeast-1.amazonaws.com:9092 --group notice_platform_consume --execute --reset-offsets --topic notice_platform_topic --to-latest
5、kakfa扩展broker移动分区
kafka集群扩容后,新的broker上面不会数据进入这些节点,也就是说,这些节点是空闲的;它只有在创建新的topic时才会参与工作。除非将已有的partition迁移到新的服务器上面;
所以需要将一些topic的分区迁移到新的broker上。
kafka-reassign-partitions.sh是kafka提供的用来重新分配partition和replica到broker上的工具
简单实现重新分配需要三步:
1、生成分配计划(generate)
2、执行分配(execute)
3、检查分配的状态(verify)
具体操作步骤如下:
1.1、编辑topic-to-move.json
"topics":
["topic":"Flashybit--Swap-BTCUSDT-Trades",
"topic":""],
"version": 1
1.2、执行分配计划生成的脚本
/home/kevin/kafka_2.13-2.6.0/bin/kafka-reassign-partitions.sh --zookeeper z-1.prod-public-kafka.jhoc3y.c4.kafka.ap-southeast-1.amazonaws.com:2181 --topics-to-move-json-file topic-to-move.json --broker-list "4,5,6" --generate
Warning: --zookeeper is deprecated, and will be removed in a future version of Kafka.
Current partition replica assignment
"version":1,"partitions":["topic":"Flashybit--Swap-BTCUSDT-Trades","partition":0,"replicas":[2,1,3],"log_dirs":["any","any","any"]]
"version":1,"partitions":["topic":"event_request","partition":0,"replicas":[6,5],"topic":"event_request","partition":1,"replicas":[7,6]]
1.3、把最下面生成的脚本保留为topic-reassignment文件
cat topic-reassignment.json
"version":1,"partitions":["topic":"Flashybit--Swap-BTCUSDT-Trades","partition":0,"replicas":[5,6,4],"log_dirs":["any","any","any"]]
2、执行分配命令(kafka-reassign-partitions.sh --zookeeper $ZK_CONNECT --reassignment-json-file topic-reassignment.json --execute
)
/home/kevin/kafka_2.13-2.6.0/bin/kafka-reassign-partitions.sh --zookeeper z-1.prod-public-kafka.jhoc3y.c4.kafka.ap-southeast-1.amazonaws.com:2181 --reassignment-json-file topic-reassignment.json --execute
Warning: --zookeeper is deprecated, and will be removed in a future version of Kafka.
Current partition replica assignment
"version":1,"partitions":["topic":"Flashybit--Swap-BTCUSDT-Trades","partition":0,"replicas":[2,1,3],"log_dirs":["any","any","any"]]
Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignment for Flashybit--Swap-BTCUSDT-Trades-0
3、查看分配状态
/home/kevin/kafka_2.13-2.6.0/bin/kafka-reassign-partitions.sh --zookeeper z-1.prod-public-kafka.jhoc3y.c4.kafka.ap-southeast-1.amazonaws.com:2181 --reassignment-json-file topic-reassignment.json --verify
Warning: --zookeeper is deprecated, and will be removed in a future version of Kafka.
Warning: because you are using the deprecated --zookeeper option, the results may be incomplete. Use --bootstrap-server instead for more accurate results.
Status of partition reassignment:
Reassignment of partition Flashybit--Swap-BTCUSDT-Trades-0 is complete.
Clearing broker-level throttles on brokers 5,1,6,2,3,4
Clearing topic-level throttles on topic Flashybit--Swap-BTCUSDT-Trades
4、中断迁移任务
一旦启动reassign 脚本,则无法停止迁移任务。如果需要强制停止,可以通过zookeeper 进行修改。
$ zookeeper-client -server 10.1.1.50:2181/kafka
[zk] delete /admin/reassign_partitions
6、tls加密认证
test-ops-s3,10.132.8.175 加密认证集群
1131 cp /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.302.b08-0.amzn2.0.1.x86_64/jre/lib/security/cacerts kafka.client.truststore.jks
1132 ll
1133 keytool -genkey -keystore kafka.client.keystore.jks -validity 3650 -storepass 123456 -keypass 123456 -dname "CN=SH" -alias AA -storetype pkcs12 -keyalg RSA
1134 ll
1135 keytool -keystore kafka.client.keystore.jks -certreq -file client-cert-sign-request -alias AA -storepass 123456 -keypass 123456
1136 ll
1137 vim client-cert-sign-request
1138 aws kafka get-bootstrap-brokers --cluster-arn arn:aws:kafka:ap-southeast-1:072461712851:cluster/prod-kafka-msk-tls-02/2b01a391-3600-4b4e-8e2b-9a00324c9418-4
1139 vi Certificate-ARN
1140 aws acm-pca get-certificate --certificate-authority-arn signed-certificate-from-acm --certificate-arn Certificate-ARN
1141 aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --certificate-arn Certificate-ARN
1142 cat Certificate-ARN
1143 aws acm-pca issue-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=3605,Type="DAYS"
1144 aws acm-pca issue-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=3505,Type="DAYS"
1145 vi Certificate-ARN
1146 aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --certificate-arn Certificate-ARN
1147 ll
1148 aws acm-pca issue-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=3505,Type="DAYS"
1149 aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --certificate-arn Certificate-ARN
1150 ll
1151 vim Certificate-ARN
1152 aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --certificate-arn Certificate-ARN
1153 vim Certificate-ARN
1154 aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --certificate-arn Certificate-ARN
1155 vim Certificate-ARN
1156 aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --certificate-arn Certificate-ARN
1157 cat Certificate-ARN
1158 aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --certificate-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e/certificate/dfbd7d47d49f6421f5f6463e2d1af37d
1159 vi signed-certificate-from-acm
1160 %s/\\\\n/\\r/g signed-certificate-from-acm
1161 ll
1162 vim signed-certificate-from-acm
1163 keytool -keystore kafka.client.keystore.jks -import -file signed-certificate-from-acm -alias AA -storepass 123456 -keypass 123456
1164 ll
1165 pwd
1167 vi client.properties
1168 /home/kevin/kafka_2.13-2.6.0/bin/kafka-console-producer.sh --bootstrap-server b-2.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094 --topic Test-Topic --producer.config client.properties
生成pem文件
keytool -importkeystore -srckeystore kafka.client.keystore.jks -srcstorepass 518519 -srckeypass 518519 -srcalias AA -destalias AA -destkeystore identity.p12 -deststoretype PKCS12 -deststorepass 518519 -destkeypass 518519
593 openssl pkcs12 -in identity.p12 -nodes -nocerts -out client.key.pem
594 openssl pkcs12 -in identity.p12 -nokeys -out client.cer.pem
595 ll
596 openssl s_client -showcerts -connect b-2.test-eks-kafka.l2fs43.c3.kafka.ap-southeast-1.amazonaws.com:9094 </dev/null 2>/dev/null|openssl x509 -outform PEM > server.cer.pem
6.拿CA的arn来为请求签名
aws acm-pca issue-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=3605,Type="DAYS"
把上一部的结果替换下面的Certificate-ARN,使用一下命令获得一个json文件。
aws acm-pca get-certificate --certificate-authority-arn < Private-CA-ARN > --certificate-arn < Certificate-ARN >
输出的结果需要拼接成一个signed-certificate-from-acm
拼接的命令 %s/\\\\n/\\r/g 把其中的\\n全部替换成新一行,并且,两段位置需要互换,就是clam在后。
7.把这个已经被CA签名的文件导回kafka.client.keystore.jks 私钥
keytool -keystore kafka.client.keystore.jks -import -file signed-certificate-from-acm -alias AA -storepass 123456 -keypass 123456
8.创建一个client.properties文件用来配合kafka使用。
security.protocol=SSL
ssl.truststore.location=/root/ca-net/kafka.client.truststore.jks
ssl.keystore.location=/root/ca-net/kafka.client.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456
9.使用kafka测试
其创建topic是不需要tls认证的,tls只加密传输的数据。
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
tar -xvzf kafka_2.12-2.8.0.tgz
./kafka-topics.sh --create --zookeeper z-1.prod-flash-kafka.2tpo15.c4.kafka.ap-southeast-1.amazonaws.com 2182 --replication-factor 3 --partitions 1 --topic Test-Topic
测试:
本机一个窗口当生产者
./kafka-console-producer.sh --broker-list b-2.prod-flash-kafka.2tpo15.c4.kafka.ap-southeast-1.amazonaws.com:9094 --topic Test-Topic --producer.config client.properties
本机再开一个窗口当消费者
./kafka-console-consumer.sh --bootstrap-server b-2.prod-flash-kafka.2tpo15.c4.kafka.ap-southeast-1.amazonaws.com:9094 --topic Test-Topic --consumer.config client.properties
如果能成功看到消息,就说明验证成功了。
10、生成各种pem证书
10.1、生成identity.p12
keytool -importkeystore -srckeystore kafka.client.keystore.jks -srcstorepass 654321 -srckeypass 654321 -srcalias AA -destalias AA -destkeystore identity.p12 -deststoretype PKCS12 -deststorepass 654321 -destkeypass 654321
10.2、通过identity.p12生成client.key.pem证书文件,中间输入
openssl pkcs12 -in identity.p12 -nodes -nocerts -out client.key.pem
Enter Import Password:
MAC verified OK
10.3、通过identity.p12生成client.cer.pem证书文件,中间输入
openssl pkcs12 -in identity.p12 -nokeys -out client.cer.pem
Enter Import Password:
MAC verified OK
10.4、通过identity.p12生成server.cer.pem
openssl s_client -showcerts -connect b-2.testnet-kafka-msk.bnccwm.c4.kafka.ap-southeast-1.amazonaws.com:9094 </dev/null 2>/dev/null|openssl x509 -outform PEM > server.cer.pem
11、通过encrypt.py对client.key.pem、client.cer.pem、server.cer.pem对证书进行加密
修改encrypt.py修改对应的filename = "client.key.pem" 文件名,生成三个文件
python3 encrypt.py
ll *.encrypted
-rw-r--r-- 1 root root 5184 Aug 27 03:33 client.cer.pem.encrypted
-rw-r--r-- 1 root root 3732 Aug 27 03:33 client.key.pem.encrypted
-rw-r--r-- 1 root root 4224 Aug 27 03:32 server.cer.pem.encrypted
7、更改logstash地址
sed -i "s/b-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-2.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094/b-1.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-2.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-3.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094/g" *.conf
sed -i s#/opt/nfs/msk-tls-cert#/opt/nfs/msk-tls-cert/msk-tls02#g *.conf
sed -i "s/684281/123456/g" *.conf
sed -i "s/b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-2.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094/b-1.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-2.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-3.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094/g" *.conf
sed -i s#/opt/logstash/msk_cert#/opt/nfs/msk-tls-cert/msk-tls02#g *.conf
以上是关于kafka常见的操作的主要内容,如果未能解决你的问题,请参考以下文章