CDH6.3.2之Kafka配置和命令
Posted 小基基o_O
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了CDH6.3.2之Kafka配置和命令相关的知识,希望对你有一定的参考价值。
文章目录
版本查看
在页面查看:点击具有Kafka服务的主机,然后点击组件
在服务器查看
如图2.11
是Scala版本,2.2.1
是Kafka版本
如果不知道CDH装哪,就把/opt/cloudera
换成/
find /opt/cloudera -name \\*kafka_\\* | grep '\\.jar'
命令
创建主题
kafka-topics --create \\
--bootstrap-server hadoop105:9092 \\
--replication-factor 副本数 \\
--partitions 分区数 \\
--topic 主题名称
注:
若使用--bootstrap-server hadoop105:9092
创建,则消费者偏移量保存在Kafka中
若使用--zookeeper hadoop105:2181/kafka
创建,则消费者偏移量保存在ZooKeeper中
查看有哪些主题
kafka-topics --list --bootstrap-server hadoop105:9092
或
kafka-topics --list --zookeeper hadoop105:2181/kafka
查看某个主题的描述
kafka-topics --describe \\
--zookeeper hadoop105:2181/kafka \\
--topic 主题名称
生产和消费
生产
kafka-console-producer \\
--broker-list Kafka地址:9092 \\
--topic 主题名称
消费(--from-beginning
会把主题以往所有数据都读取出来)
kafka-console-consumer \\
--bootstrap-server Kafka地址:9092 \\
--topic 主题名称 \\
--from-beginning \\
--group 消费者组名称
压测
1、创建单分区单副本主题
kafka-topics --create \\
--replication-factor 1 \\
--partitions 1 \\
--bootstrap-server hadoop105:9092 \\
--topic StressTesting
2、写测试
生产者参数 | 描述 | 备注 |
---|---|---|
--throughput | 最大消息吞吐量限制,-1 不限制 | messages/sec |
--print-metrics | 打印指标 | 默认不打印 |
--num-records | 生产多少条消息 | |
--record-size | 每条消息的大小 | bytes |
--producer-props | 生产者相关配置,如服务器地址 | 优先级高于--producer.config |
--producer.config | 生产者配置文件路径 |
kafka-producer-perf-test \\
--throughput -1 \\
--print-metrics \\
--num-records 1000000 \\
--record-size 1024 \\
--producer-props bootstrap.servers=hadoop105:9092 \\
--print-metrics \\
--topic StressTesting
某写测试结果
records sent | records/sec | MB/sec | avg latency | max latency |
---|---|---|---|---|
1000000 | 24396.789383 | 23.82 | 1232.27 ms | 2096.00 ms |
3、读测试
消费者参数 | 描述 |
---|---|
--messages | 消费的消息数量 |
kafka-consumer-perf-test \\
--print-metrics \\
--messages 1000000 \\
--broker-list hadoop105:9092 \\
--topic StressTesting
某读测试结果
data.consumed.in.MB | MB.sec | data.consumed.in.nMsg | nMsg.sec | rebalance.time.ms | fetch.time.ms | fetch.MB.sec | fetch.nMsg.sec |
---|---|---|---|---|---|---|---|
976.5625 | 56.8695 | 1000000 | 58234.3350 | 3087 | 14085 | 69.3335 | 70997.5151 |
配置
最大Java堆大小
broker_max_heap_size
建议给4~6G每个Broker
日志数据存储路径
log.dirs
默认副本数
default.replication.factor
建议>1
日志保留策略
log.retention
log.retention.ms
:日志保留时长,设-1时无限制,建议3天
log.retention.bytes
:每个主题分区保留在日志中的数据量,设-1时无限制
log.retention.check.interval.ms
:如图示,每5分钟,日志清理程序 会根据日志保留策略 来清理符合删除条件的日志
接受消息的大小
可接收的单个消息的最大值(
message.max.bytes
)
replica.fetch.max.bytes
应大于message.max.bytes
按需调大
其它
配置 | 说明 | 备注 |
---|---|---|
delete.topic.enable | 是否允许删除主题 | CDH的Kafka默认启用,不用改 |
broker.id | broker 编号,唯一的,每台机都不同 | 通常不用改 |
num.network.threads | 处理网络请求的线程数 | |
num.io.threads | 处理网络请求的线程数 | |
num.partitions | 每个主题的默认分区数 | 默认1不用改 |
log.segment.bytes | 日志片段最大字节数 | 必须大于message.max.bytes |
以上是关于CDH6.3.2之Kafka配置和命令的主要内容,如果未能解决你的问题,请参考以下文章
FlinkFlink CDH6.3.2 下的yarn per job模式 savepoint和checkpoint,卡住,没有保存成功文件
是否可以配置KafkaConsumer(apache.kafka.kafka-clients)以通过代理与Kafka经纪人一起使用?