kafka常见的操作

Posted lqbyz

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka常见的操作相关的知识,希望对你有一定的参考价值。

1、kafka监控

1、生产环境kafka监控
MSK prod-kafka-msk-tls-01集群监控,
1、kafka_exporter安装地址为:singapore-prod-riskmonitor-logstash-09,端口9309

root@singapore-prod-riskmonitor-logstash-09,10.31.4.141:kafka_exporter # nohup  /opt/kafka_exporter/kafka_exporter  --kafka.server=b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --kafka.server=b-2.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --kafka.server=b-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --tls.enabled --tls.ca-file=server.cer.pem --tls.insecure-skip-tls-verify --web.listen-address=:9309 --tls.cert-file=client.cer.pem --tls.key-file=client.key.pem &

#nohup  /opt/kafka_exporter/kafka_exporter  \\
--kafka.server=b-3.prod-kafka1.amazonaws.com:9094 \\
--kafka.server=b-2.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 \\
--kafka.server=b-1.prod-kafk.ap-southeast-1.amazonaws.com:9094 \\
--tls.enabled \\
--tls.ca-file=server.cer.pem \\
--tls.insecure-skip-tls-verify \\
--web.listen-address=:9309 \\
--tls.cert-file=client.cer.pem \\
--tls.key-file=client.key.pem

2、prod-flash-kafka-tls监控 ,监控端口9310
 nohup  /opt/kafka_exporter/kafka_exporter  --kafka.server=b-2.prod-flash-kafka.2tpo15.c4.kafka.ap-southeast-1.amazonaws.com:9094 --kafka.server=b-3.prod-flash-kafka.2tpo15.c4.kafka.ap-southeast-1.amazonaws.com:9094 --kafka.server=b-1.prod-flash-kafka.2tpo15.c4.kafka.ap-southeast-1.amazonaws.com:9094 --web.listen-address=:9310 --tls.cert-file=client.cer.pem --tls.key-file=client.key.pem --tls.enabled --tls.ca-file=server.cer.pem --tls.insecure-skip-tls-verify &

3、prod-msk-kafka-01 监控,监控端口默认9308
 nohup /opt/kafka_exporter/kafka_exporter --kafka.server=b-1.prod-msk-kafka-01.himvak.c4.kafka.ap-southeast-1.amazonaws.com:9092 --log.level=info &

2、kafka常用的查询命令

3、TLS环境下查看kafka相关的查询

1、创建client.properties 文件
ssl.keystore.location = /opt/nfs/msk-tls-cert/kafka.client.keystore.jks
ssl.keystore.password = 684281
ssl.keystore.type = jks
security.protocol = SSL

一、查看topic相关的操作。
1、查看topic的列表(使用zookeeper和broker都可以用来查询)
  使用broker进行查询
./kafka_2.13-2.6.2/bin/kafka-topics.sh --bootstrap-server b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --command-config client.properties  --list 
 使用zookeeper进行查询
./kafka_2.13-2.6.2/bin/kafka-topics.sh --list --zookeeper z-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:2181

2、查看topic分区的相关的信息
使用broker进行查看
./kafka_2.13-2.6.2/bin/kafka-topics.sh --bootstrap-server b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --describe --command-config client.properties
使用zookeeper进行查看
./kafka_2.13-2.6.2/bin/kafka-topics.sh  --zookeeper z-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:2181 --describe 

3、创建topic
./kafka_2.13-2.6.2/bin/kafka-topics.sh  --zookeeper z-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:2181 --create --topic test20210702-02 --partitions 1 --replication-factor 3
4、删除topic
root@singapore-prod-riskmonitor-logstash-09,10.31.4.141:kevin.li # ./kafka_2.13-2.6.2/bin/kafka-topics.sh  --zookeeper z-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:2181 --delete --topic test20210702-02

二、消费组的相关操作
1、查看消费组信息的列表
./kafka_2.13-2.6.2/bin/kafka-consumer-groups.sh --bootstrap-server b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --list --command-config client.properties 

2、查看消费组lag信息
./kafka_2.13-2.6.2/bin/kafka-consumer-groups.sh --bootstrap-server b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --describe --command-config client.properties --group test-20210630

3、删除消费组
./kafka_2.13-2.6.2/bin/kafka-consumer-groups.sh --bootstrap-server  b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094 --delete --group test-20210630 --command-config client.properties

4、重置消费组的offset点

./kafka_2.13-2.6.0/bin/kafka-consumer-groups.sh  --bootstrap-server b-1.prod-public-kafka.jhoc3y.c4.kafka.ap-southeast-1.amazonaws.com:9092 --group notice_platform_consume --execute  --reset-offsets --topic notice_platform_topic --to-latest

5、kakfa扩展broker移动分区

kafka集群扩容后,新的broker上面不会数据进入这些节点,也就是说,这些节点是空闲的;它只有在创建新的topic时才会参与工作。除非将已有的partition迁移到新的服务器上面;
所以需要将一些topic的分区迁移到新的broker上。
kafka-reassign-partitions.sh是kafka提供的用来重新分配partition和replica到broker上的工具
简单实现重新分配需要三步:
1、生成分配计划(generate)
2、执行分配(execute)
3、检查分配的状态(verify)

具体操作步骤如下:
1.1、编辑topic-to-move.json
"topics":
    ["topic":"Flashybit--Swap-BTCUSDT-Trades",
     "topic":""],
    "version": 1

1.2、执行分配计划生成的脚本
 /home/kevin/kafka_2.13-2.6.0/bin/kafka-reassign-partitions.sh --zookeeper z-1.prod-public-kafka.jhoc3y.c4.kafka.ap-southeast-1.amazonaws.com:2181 --topics-to-move-json-file topic-to-move.json --broker-list "4,5,6" --generate

Warning: --zookeeper is deprecated, and will be removed in a future version of Kafka.
Current partition replica assignment
"version":1,"partitions":["topic":"Flashybit--Swap-BTCUSDT-Trades","partition":0,"replicas":[2,1,3],"log_dirs":["any","any","any"]]

"version":1,"partitions":["topic":"event_request","partition":0,"replicas":[6,5],"topic":"event_request","partition":1,"replicas":[7,6]]
1.3、把最下面生成的脚本保留为topic-reassignment文件
cat topic-reassignment.json 
"version":1,"partitions":["topic":"Flashybit--Swap-BTCUSDT-Trades","partition":0,"replicas":[5,6,4],"log_dirs":["any","any","any"]]

2、执行分配命令(kafka-reassign-partitions.sh --zookeeper $ZK_CONNECT --reassignment-json-file topic-reassignment.json --execute
)
/home/kevin/kafka_2.13-2.6.0/bin/kafka-reassign-partitions.sh --zookeeper z-1.prod-public-kafka.jhoc3y.c4.kafka.ap-southeast-1.amazonaws.com:2181 --reassignment-json-file topic-reassignment.json --execute

Warning: --zookeeper is deprecated, and will be removed in a future version of Kafka.
Current partition replica assignment

"version":1,"partitions":["topic":"Flashybit--Swap-BTCUSDT-Trades","partition":0,"replicas":[2,1,3],"log_dirs":["any","any","any"]]

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignment for Flashybit--Swap-BTCUSDT-Trades-0

3、查看分配状态
/home/kevin/kafka_2.13-2.6.0/bin/kafka-reassign-partitions.sh --zookeeper z-1.prod-public-kafka.jhoc3y.c4.kafka.ap-southeast-1.amazonaws.com:2181 --reassignment-json-file topic-reassignment.json --verify
Warning: --zookeeper is deprecated, and will be removed in a future version of Kafka.
Warning: because you are using the deprecated --zookeeper option, the results may be incomplete.  Use --bootstrap-server instead for more accurate results.
Status of partition reassignment:
Reassignment of partition Flashybit--Swap-BTCUSDT-Trades-0 is complete.
Clearing broker-level throttles on brokers 5,1,6,2,3,4
Clearing topic-level throttles on topic Flashybit--Swap-BTCUSDT-Trades

4、中断迁移任务
一旦启动reassign 脚本,则无法停止迁移任务。如果需要强制停止,可以通过zookeeper 进行修改。
$ zookeeper-client -server 10.1.1.50:2181/kafka
[zk] delete /admin/reassign_partitions

6、tls加密认证

test-ops-s3,10.132.8.175  加密认证集群
1131  cp /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.302.b08-0.amzn2.0.1.x86_64/jre/lib/security/cacerts  kafka.client.truststore.jks
 1132  ll
 1133  keytool -genkey -keystore kafka.client.keystore.jks -validity 3650 -storepass 123456 -keypass 123456 -dname "CN=SH" -alias AA -storetype pkcs12 -keyalg RSA
 1134  ll
 1135  keytool -keystore kafka.client.keystore.jks -certreq -file client-cert-sign-request -alias AA -storepass 123456 -keypass 123456
 1136  ll
 1137  vim client-cert-sign-request 
 1138  aws kafka get-bootstrap-brokers --cluster-arn arn:aws:kafka:ap-southeast-1:072461712851:cluster/prod-kafka-msk-tls-02/2b01a391-3600-4b4e-8e2b-9a00324c9418-4
 1139  vi Certificate-ARN
 1140  aws acm-pca get-certificate --certificate-authority-arn signed-certificate-from-acm --certificate-arn Certificate-ARN
 1141  aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e  --certificate-arn Certificate-ARN
 1142  cat Certificate-ARN 
 1143  aws acm-pca issue-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=3605,Type="DAYS"
 1144  aws acm-pca issue-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=3505,Type="DAYS"
 1145  vi Certificate-ARN 
 1146  aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e  --certificate-arn Certificate-ARN
 1147  ll
 1148  aws acm-pca issue-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=3505,Type="DAYS"
 1149  aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e  --certificate-arn Certificate-ARN
 1150  ll
 1151  vim Certificate-ARN 
 1152  aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e  --certificate-arn Certificate-ARN
 1153  vim Certificate-ARN 
 1154  aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e  --certificate-arn Certificate-ARN
 1155  vim Certificate-ARN 
 1156  aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e  --certificate-arn Certificate-ARN
 1157  cat Certificate-ARN 
 1158  aws acm-pca get-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e  --certificate-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e/certificate/dfbd7d47d49f6421f5f6463e2d1af37d
 1159  vi signed-certificate-from-acm
 1160  %s/\\\\n/\\r/g  signed-certificate-from-acm 
 1161  ll
 1162  vim signed-certificate-from-acm 
 1163  keytool -keystore kafka.client.keystore.jks -import -file signed-certificate-from-acm -alias AA -storepass 123456 -keypass 123456
 1164  ll
 1165  pwd
 1167  vi client.properties
 1168  /home/kevin/kafka_2.13-2.6.0/bin/kafka-console-producer.sh --bootstrap-server b-2.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094 --topic Test-Topic --producer.config client.properties 

生成pem文件
  keytool -importkeystore -srckeystore kafka.client.keystore.jks  -srcstorepass 518519 -srckeypass 518519 -srcalias AA -destalias AA -destkeystore identity.p12 -deststoretype PKCS12 -deststorepass 518519 -destkeypass 518519

  593  openssl pkcs12 -in identity.p12 -nodes -nocerts -out client.key.pem
  594  openssl pkcs12 -in identity.p12 -nokeys -out client.cer.pem
  595  ll
  596  openssl s_client -showcerts -connect b-2.test-eks-kafka.l2fs43.c3.kafka.ap-southeast-1.amazonaws.com:9094  </dev/null 2>/dev/null|openssl x509 -outform PEM > server.cer.pem

6.拿CA的arn来为请求签名
aws acm-pca issue-certificate --certificate-authority-arn arn:aws:acm-pca:ap-southeast-1:072461712851:certificate-authority/22efe35f-5167-4819-abfc-72b6f1d2de4e --csr fileb://client-cert-sign-request --signing-algorithm "SHA256WITHRSA" --validity Value=3605,Type="DAYS"

把上一部的结果替换下面的Certificate-ARN,使用一下命令获得一个json文件。
aws acm-pca get-certificate --certificate-authority-arn < Private-CA-ARN > --certificate-arn < Certificate-ARN >

输出的结果需要拼接成一个signed-certificate-from-acm

拼接的命令 %s/\\\\n/\\r/g 把其中的\\n全部替换成新一行,并且,两段位置需要互换,就是clam在后。

7.把这个已经被CA签名的文件导回kafka.client.keystore.jks 私钥
keytool -keystore kafka.client.keystore.jks -import -file signed-certificate-from-acm -alias AA -storepass 123456 -keypass 123456

8.创建一个client.properties文件用来配合kafka使用。

security.protocol=SSL
ssl.truststore.location=/root/ca-net/kafka.client.truststore.jks
ssl.keystore.location=/root/ca-net/kafka.client.keystore.jks
ssl.keystore.password=123456
ssl.key.password=123456

9.使用kafka测试
其创建topic是不需要tls认证的,tls只加密传输的数据。
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
tar -xvzf kafka_2.12-2.8.0.tgz
./kafka-topics.sh --create --zookeeper z-1.prod-flash-kafka.2tpo15.c4.kafka.ap-southeast-1.amazonaws.com 2182 --replication-factor 3 --partitions 1 --topic Test-Topic
测试:
本机一个窗口当生产者
./kafka-console-producer.sh --broker-list b-2.prod-flash-kafka.2tpo15.c4.kafka.ap-southeast-1.amazonaws.com:9094 --topic Test-Topic --producer.config client.properties
本机再开一个窗口当消费者
./kafka-console-consumer.sh --bootstrap-server b-2.prod-flash-kafka.2tpo15.c4.kafka.ap-southeast-1.amazonaws.com:9094 --topic Test-Topic --consumer.config client.properties

如果能成功看到消息,就说明验证成功了。

10、生成各种pem证书
10.1、生成identity.p12
keytool -importkeystore -srckeystore kafka.client.keystore.jks  -srcstorepass 654321 -srckeypass 654321 -srcalias AA -destalias AA -destkeystore identity.p12 -deststoretype PKCS12 -deststorepass 654321 -destkeypass 654321
10.2、通过identity.p12生成client.key.pem证书文件,中间输入
openssl pkcs12 -in identity.p12 -nodes -nocerts -out client.key.pem
Enter Import Password:
MAC verified OK
10.3、通过identity.p12生成client.cer.pem证书文件,中间输入
openssl pkcs12 -in identity.p12 -nokeys -out client.cer.pem
Enter Import Password:
MAC verified OK
10.4、通过identity.p12生成server.cer.pem
openssl s_client -showcerts -connect b-2.testnet-kafka-msk.bnccwm.c4.kafka.ap-southeast-1.amazonaws.com:9094   </dev/null 2>/dev/null|openssl x509 -outform PEM > server.cer.pem

11、通过encrypt.py对client.key.pem、client.cer.pem、server.cer.pem对证书进行加密
修改encrypt.py修改对应的filename = "client.key.pem" 文件名,生成三个文件
python3 encrypt.py
ll *.encrypted
-rw-r--r-- 1 root root 5184 Aug 27 03:33 client.cer.pem.encrypted
-rw-r--r-- 1 root root 3732 Aug 27 03:33 client.key.pem.encrypted
-rw-r--r-- 1 root root 4224 Aug 27 03:32 server.cer.pem.encrypted

7、更改logstash地址

sed -i "s/b-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-2.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094/b-1.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-2.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-3.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094/g"  *.conf
sed  -i s#/opt/nfs/msk-tls-cert#/opt/nfs/msk-tls-cert/msk-tls02#g *.conf
sed -i "s/684281/123456/g" *.conf

sed -i "s/b-3.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-1.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-2.prod-kafka-msk-tl.68wgke.c4.kafka.ap-southeast-1.amazonaws.com:9094/b-1.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-2.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094,b-3.prod-kafka-msk-tl.xbxwhk.c4.kafka.ap-southeast-1.amazonaws.com:9094/g"  *.conf

sed  -i s#/opt/logstash/msk_cert#/opt/nfs/msk-tls-cert/msk-tls02#g *.conf

以上是关于kafka常见的操作的主要内容,如果未能解决你的问题,请参考以下文章

Alfred常见使用

MySQL系列:kafka停止命令

视图或片段库为常见数据类型组成 UI

配置 kafka 同步刷盘

canal+Kafka实现mysql与redis数据同步

消息中间件Kafka - PHP操作使用Kafka