zookeeper+kafka,使用Java实现消息对接读取

Posted dhsunny

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper+kafka,使用Java实现消息对接读取相关的知识,希望对你有一定的参考价值。

一、从官网上下载zookeeperkafka(本地使用的版本是zookeeper-3.3.6kafka_2.11-1.0.0):

 技术分享图片

 

 

二、配置zookeeperkafka并启动,基本的zkCli命令和kafka创建删除topic命令。

2.1配置zookeeper,主要配置有两个,一个是端口2181,另外一个是数据存储路径。

 技术分享图片

 

2.2启动zookeeper

 技术分享图片

 

2.3使用zookeeper的客户端,查看zookeeper结点的值

 技术分享图片

常用的命令就4个,分别是lsgetsetdelete

[zk: 127.0.0.1:2181(CONNECTED) 45] help

ZooKeeper -server host:port cmd args

        set path data [version]

        ls path [watch]

        delete path [version]

        get path [watch]

[zk: 127.0.0.1:2181(CONNECTED) 46]

2.4配置kafka,主要配置有两个,一个是zookeeperip:port,另外一个kafka自己的broker端口通用值为9092.

 技术分享图片

2.5启动kafka

 技术分享图片

2.6使用kafka的基本命令,其实也就4个,分别是创建topic,查看topic,删除topic

例如需要创建topic

kafka-topics.bat --create

--zookeeper  localhost:2181

--replication-factor 1 --partitions 1

--topic dhpeitopic

 技术分享图片

 

三、使用Java来操作Kafka

3.1创建topic,使用【2.6】中创建topic命令的方法创建一个topic,命名为dhpeitopic

3.1发送数据KafkaProductor

主要也就是两步,第一步准备连接kafka的参数,其实主要是配置个brokerip:port;第二步向指定的topic发送数据,一般情况下我们发送都是JSON数据。

 技术分享图片

 

3.2接收数据(持续不断的来接收数据)

主要分为五步:

第一步,准备连接kafka的参数,其实主要是zookeeperip:porttopic的名称。

第二步,根据连接参数获取的kafkaconnector,就像根据数据库参数获取到jdbc连接一样。

第三步,根据kafka连接connector来获取的kafkaStream(个人感觉就是持续不断的流)

第四步,获取到kafkaStream

第五步,根据获取到的kafkaStream来源源不断的读取数据。

 技术分享图片

 

3.3发送数据控制台打印日志

 技术分享图片

 

3.4接收数据控制台打印的日志

 技术分享图片

 

总结,在web应用中zookeeper比较常用,作为分布式系统的发布服务和注册服务来使用。Kafka应用也比较多。

zookeeper主要是分布式系统的协调系统,通过zookeeper的结点来协调分布式系统中的调用,当然zookeeper也可以存储配置文件,在结点(path)上面存放数据。

kafka主要作为高吞吐量的分布式流处理通道,并且提供磁盘数据为消息持久化,支持集群。

以上是关于zookeeper+kafka,使用Java实现消息对接读取的主要内容,如果未能解决你的问题,请参考以下文章

kafka部署遇到的问题

Kafka和Storm环境下如何实现多租户?

测开之路七十三:用kafka实现消息队列之环境搭建

Java面试题(Kafka篇+zookeeper 篇)

Java面试题(Kafka篇+zookeeper 篇)

java 嵌入Kafka + Zookeeper用于测试目的。使用Apache Kafka 0.8进行测试