kafka实战
Posted 我和你并没有不同
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka实战相关的知识,希望对你有一定的参考价值。
一、下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
Java: http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
Java的安装http://czj4451.iteye.com/blog/2041159
二、启动kafka自带的zookeeper: ./bin/zookeeper-server-start.sh ./config/zookeeper.properties & (用&是为了能退出命令行)
启动Kafka:1)虚拟机启动需要把这个环境变量设小,用默认的话会有问题export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
2) ./bin/kafka-server-start.sh config/server.properties &
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
停止服务: ./bin/kafka-server-stop.sh ./bin/zookeeper-server-stop.sh
三、构造一个跨虚拟机的、只有一个kafka服务的环境
1)两个在一个局域网的Ubuntu虚拟机(一个叫master,一个叫slaver1)
2)一个虚拟机(master)上面起zookeeper、kafka服务、kafka-producer;
要修改下config/server.properties中的listeners(效果同之前的版本的host.name及port:注意绑定host.name,否则可能出现莫名其妙的错误如consumer找不到broker。这个host.name是Kafka的server的机器名字,会注册到Zookeeper中)
虚拟机ip是10.0.0.5,不改的话,listeners默认取值localhost
producer代码是
3)另一个虚拟机(slaver1)上起kafka-consumer
代码是
==============================
kafka shell基本命令:
http://www.cnblogs.com/xiaodf/p/6093261.html
http://orchome.com/35
-1)创建topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka
0)查看所有topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
1)查看某个topic信息
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic haha
2)查看topic某分区偏移量
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic haha --time -1 --broker-list localhost:9092 --partitions 0
3)列出所有主题中的所有用户组:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
如果你使用老的高级消费者并存储分组元数据在zookeeper(即。offsets.storage=zookeeper)通过--zookeeper,而不是bootstrap-server:
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
4)查看消费进度
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group wind
如果你使用老的高级消费者并存储分组元数据在zookeeper(即。offsets.storage=zookeeper)通过--zookeeper,而不是bootstrap-server:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group console-consumer-25010 --zookeeper localhost:2181
topic:创建时topic名称
partition:分区编号
offset:表示该parition已经消费了多少条message
logSize:表示该partition已经写了多少条message
Lag:表示有多少条message没有被消费。
Owner:表示消费者
Created:该partition创建时间
=============注意事项======
1)如果需要在其他节点作为客户端使用指令连接kafka broker,则需要注意设置kafka broker 配置文件中 host.name 参数为监听的IP地址
以上是关于kafka实战的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一
Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一