zookeeper+kafka,使用Java实现消息对接读取
Posted dhsunny
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper+kafka,使用Java实现消息对接读取相关的知识,希望对你有一定的参考价值。
一、从官网上下载zookeeper和kafka(本地使用的版本是zookeeper-3.3.6,kafka_2.11-1.0.0):
二、配置zookeeper和kafka并启动,基本的zkCli命令和kafka创建删除topic命令。
2.1配置zookeeper,主要配置有两个,一个是端口2181,另外一个是数据存储路径。
2.2启动zookeeper
2.3使用zookeeper的客户端,查看zookeeper结点的值
常用的命令就4个,分别是ls,get,set,delete。
[zk: 127.0.0.1:2181(CONNECTED) 45] help
ZooKeeper -server host:port cmd args
set path data [version]
ls path [watch]
delete path [version]
get path [watch]
[zk: 127.0.0.1:2181(CONNECTED) 46]
2.4配置kafka,主要配置有两个,一个是zookeeper的ip:port,另外一个kafka自己的broker端口通用值为9092.
2.5启动kafka
2.6使用kafka的基本命令,其实也就4个,分别是创建topic,查看topic,删除topic。
例如需要创建topic:
kafka-topics.bat --create
--zookeeper localhost:2181
--replication-factor 1 --partitions 1
--topic dhpeitopic
三、使用Java来操作Kafka。
3.1创建topic,使用【2.6】中创建topic命令的方法创建一个topic,命名为dhpeitopic。
3.1发送数据KafkaProductor。
主要也就是两步,第一步准备连接kafka的参数,其实主要是配置个broker的ip:port;第二步向指定的topic发送数据,一般情况下我们发送都是JSON数据。
3.2接收数据(持续不断的来接收数据)
主要分为五步:
第一步,准备连接kafka的参数,其实主要是zookeeper的ip:port和topic的名称。
第二步,根据连接参数获取的kafka的connector,就像根据数据库参数获取到jdbc连接一样。
第三步,根据kafka连接connector来获取的kafkaStream(个人感觉就是持续不断的流)。
第四步,获取到kafkaStream。
第五步,根据获取到的kafkaStream来源源不断的读取数据。
3.3发送数据控制台打印日志
3.4接收数据控制台打印的日志
总结,在web应用中zookeeper比较常用,作为分布式系统的发布服务和注册服务来使用。Kafka应用也比较多。
zookeeper主要是分布式系统的协调系统,通过zookeeper的结点来协调分布式系统中的调用,当然zookeeper也可以存储配置文件,在结点(path)上面存放数据。
kafka主要作为高吞吐量的分布式流处理通道,并且提供磁盘数据为消息持久化,支持集群。
以上是关于zookeeper+kafka,使用Java实现消息对接读取的主要内容,如果未能解决你的问题,请参考以下文章