Java通过zk连接kafka,程序未报错,但是取不到数据。将程序在另一台主机

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java通过zk连接kafka,程序未报错,但是取不到数据。将程序在另一台主机相关的知识,希望对你有一定的参考价值。

Java通过zk连接kafka,程序未报错,但是取不到数据。将程序在另一台主机上运行能取到数据。2181端口路由没问题。两台主机有个明显的差别,22端口前者不通。跟22端口有关系吗?或者其他什么原因?

参考技术A publicstaticvoidconsumer()Propertiesprops=newProperties();props.put("zk.connect","hadoop-2:2181");props.put("zk.connectiontimeout.ms","1000000");props.put("groupid","fans_group");//CreatetheconnectiontotheclusterConsumerConfigconsumerConfig=newConsumerConfig(props);ConsumerConnectorconsumerConnector=Consumer.createJavaConsumerConnector(consumerConfig);Mapmap=newHashMap();map.put("fans",1);//create4partitionsofthestreamfortopic“test”,toallow4threadstoconsumeMap>>topicMessageStreams=consumerConnector.createMessageStreams(map);List>streams=topicMessageStreams.get("fans");//createlistof4threadstoconsumefromeachofthepartitionsExecutorServiceexecutor=Executors.newFixedThreadPool(1);longstartTime=System.currentTimeMillis();//consumethemessagesinthethreadsfor(finalKafkaStreamstream:streams)executor.submit(newRunnable()publicvoidrun()ConsumerIteratorit=stream.iterator();while(it.hasNext())log.debug(byteBufferToString(it.next().message().payload())););log.debug("usetime="+(System.currentTimeMillis()-startTime));追问

没有

参考技术B 配置上hosts,通过zk连接kafka要保证zk端口通,并且kafka也要通;同时zk中配置的kafka的主机名要与hosts的一致。

kafka内置的zookeeper

kafka 很多说不需要安装zk的是因为他们都使用了kafka自带的zk

至于kafka为什么使用zk,你首先要知道zk的作用, 作为去中心化的集群模式。

需要要消费者知道现在那些生产者(对于消费者而言,kafka就是生产者)是可用的。

如果没了zk消费者如何知道呢?如果每次消费者在消费之前都去尝试连接生产者测试下是否连接成功,效率呢?

所以kafka需要zk,在kafka的设计中就依赖了zk了。

--- 共有 1 条评论 ---

翟志军: 需要要消费者知道现在那些生产者(对于消费者而言,kafka就是生产者)是可用的。 我觉得这句说到点了。 1年前
 

(卡夫卡)生产者                 (zk)消费者

     kafka                          zookeeper

kafka需要zk,在kafaka的设计中就是依赖了zk了

以上是关于Java通过zk连接kafka,程序未报错,但是取不到数据。将程序在另一台主机的主要内容,如果未能解决你的问题,请参考以下文章

uniappcli启动不成功未报错

Vagrant中的Kafka Cluster(ZK,BR,BR,BR)无法建立连接

hbase连接java时,zookeeper总是连接不上,但是确实已经启动,每次连接,都报错

kafka内置的zookeeper

实战Kafka ACL机制

zookeeper连不上