kafka集群测试正常,但是Java连接kafka出现异常,急求大神解答!!!!!!!!!!!
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka集群测试正常,但是Java连接kafka出现异常,急求大神解答!!!!!!!!!!!相关的知识,希望对你有一定的参考价值。
1533 [Thread-0] WARN kafka.client.ClientUtils$ - Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:1,host:node3,port:6667] failed
java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.9.2-0.8.2.2.jar:na]
[kafka_2.9.2-0.8.2.2.jar:na]
at kafka.utils.Utils$.swallowError(Utils.scala:45) [kafka_2.9.2-0.8.2.2.jar:na]
集群: bin/kafka-console-producer.sh --broker-list node2:6667 --topic test,可以正常产生数据,也能正常消费》
Java中代码:
public class DataProducerInsert
// TODO Auto-generated method stub
private static Producer<Integer,String> producer;
Properties props=new Properties();
public DataProducerInsert()
//定义连接的broker list
props.put("metadata.broker.list", "node1:6667,node3:6667,node2:6667");
//定义序列化类 Java中对象传输之前要序列化
props.put("serializer.class", "kafka.serializer.StringEncoder");
//props.put("advertised.host.name", "192.168.1.216");
producer = new Producer<Integer, String>(new ProducerConfig(props));
public static void main(String[] args)
DataProducerInsert sp=new DataProducerInsert();
//定义topic
String topic="topic1";
//开始时间统计
long startTime = System.currentTimeMillis();
//定义要发送给topic的消息
String messageStr = "This is a message";
List<KeyedMessage<Integer, String>> datalist = new ArrayList<KeyedMessage<Integer, String>>();
//构建消息对象
KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
datalist.add(data);
//结束时间统计
long endTime = System.currentTimeMillis();
KeyedMessage<Integer, String> data1 = new KeyedMessage<Integer, String>(topic, "用时" + (endTime-startTime)/1000.0);
datalist.add(data1);
//推送消息到broker
producer.send(data);
producer.close();
windows 平台的hosts文件:(外网地址)
106.14.47.114 node1
106.14.76.118 node2
106.14.39.127 node3
Linux服务器上的hosts文件:(内网地址)
10.27.147.94 node1
10.27.146.93 node2
10.27.148.109 node3,
kafka的地址是node1:6667,node2:6667,node3:6667,这是ambari平台下集成的kafka。
其次查看代码是生产者,看Kafka 集群里这个主题是否存在(如果不存在,默认是配置可以自动创建,看是非将该配置修改);然后检测防火墙,相应端口是否开放(防火墙直接关也可以);检测 server.properties 文件的 listeners 是否配置,若没有将其配置好
Java客户端连接kafka集群报错
往kafka集群发送消息时,报错如下:
page_visits-1: 30005 ms has passed since batch creation plus linger time
加入log4j.properties,设置为DEBUG级别,错误如下:
2017-06-03 17:33:31,417 DEBUG [org.apache.kafka.clients.NetworkClient] - Error connecting to node 2 at kafka-cluster-64bit:9094:
java.io.IOException: Can‘t resolve address: kafka-cluster-64bit:9094
at org.apache.kafka.common.network.Selector.connect(Selector.java:182)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:186)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:126)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
at org.apache.kafka.common.network.Selector.connect(Selector.java:179)
... 5 more
解决办法:
config/server.properties修改
修改前:listeners=PLAINTEXT://:9092
修改后:listeners=PLAINTEXT://192.168.137.176:9092
具体代码参照:Kafka JAVA客户端代码示例
以上是关于kafka集群测试正常,但是Java连接kafka出现异常,急求大神解答!!!!!!!!!!!的主要内容,如果未能解决你的问题,请参考以下文章