kafka集群测试正常,但是Java连接kafka出现异常,急求大神解答!!!!!!!!!!!

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka集群测试正常,但是Java连接kafka出现异常,急求大神解答!!!!!!!!!!!相关的知识,希望对你有一定的参考价值。

1533 [Thread-0] WARN kafka.client.ClientUtils$ - Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:1,host:node3,port:6667] failed
java.nio.channels.ClosedChannelException: null
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[kafka_2.9.2-0.8.2.2.jar:na]
[kafka_2.9.2-0.8.2.2.jar:na]
at kafka.utils.Utils$.swallowError(Utils.scala:45) [kafka_2.9.2-0.8.2.2.jar:na]

集群: bin/kafka-console-producer.sh --broker-list node2:6667 --topic test,可以正常产生数据,也能正常消费》
Java中代码:
public class DataProducerInsert
// TODO Auto-generated method stub
private static Producer<Integer,String> producer;
Properties props=new Properties();
public DataProducerInsert()
//定义连接的broker list
props.put("metadata.broker.list", "node1:6667,node3:6667,node2:6667");
//定义序列化类 Java中对象传输之前要序列化
props.put("serializer.class", "kafka.serializer.StringEncoder");
//props.put("advertised.host.name", "192.168.1.216");
producer = new Producer<Integer, String>(new ProducerConfig(props));

public static void main(String[] args)
DataProducerInsert sp=new DataProducerInsert();
//定义topic
String topic="topic1";
//开始时间统计
long startTime = System.currentTimeMillis();
//定义要发送给topic的消息
String messageStr = "This is a message";
List<KeyedMessage<Integer, String>> datalist = new ArrayList<KeyedMessage<Integer, String>>();

//构建消息对象
KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);
datalist.add(data);

//结束时间统计
long endTime = System.currentTimeMillis();
KeyedMessage<Integer, String> data1 = new KeyedMessage<Integer, String>(topic, "用时" + (endTime-startTime)/1000.0);
datalist.add(data1);

//推送消息到broker
producer.send(data);
producer.close();



windows 平台的hosts文件:(外网地址)
106.14.47.114 node1
106.14.76.118 node2
106.14.39.127 node3
Linux服务器上的hosts文件:(内网地址)
10.27.147.94 node1
10.27.146.93 node2
10.27.148.109 node3,
kafka的地址是node1:6667,node2:6667,node3:6667,这是ambari平台下集成的kafka。

参考技术A 首先你在链接时候检查是否代码里的IP 和端口是不是对的,端口是broker 端口,默认9092 ;
其次查看代码是生产者,看Kafka 集群里这个主题是否存在(如果不存在,默认是配置可以自动创建,看是非将该配置修改);然后检测防火墙,相应端口是否开放(防火墙直接关也可以);检测 server.properties 文件的 listeners 是否配置,若没有将其配置好

Java客户端连接kafka集群报错

往kafka集群发送消息时,报错如下:

page_visits-1: 30005 ms has passed since batch creation plus linger time

加入log4j.properties,设置为DEBUG级别,错误如下:

2017-06-03 17:33:31,417 DEBUG [org.apache.kafka.clients.NetworkClient] - Error connecting to node 2 at kafka-cluster-64bit:9094:
java.io.IOException: Can‘t resolve address: kafka-cluster-64bit:9094
    at org.apache.kafka.common.network.Selector.connect(Selector.java:182)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:629)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:186)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:126)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.UnresolvedAddressException
    at sun.nio.ch.Net.checkAddress(Net.java:101)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622)
    at org.apache.kafka.common.network.Selector.connect(Selector.java:179)
    ... 5 more

 

解决办法:

config/server.properties修改

修改前:listeners=PLAINTEXT://:9092

修改后:listeners=PLAINTEXT://192.168.137.176:9092

具体代码参照:Kafka JAVA客户端代码示例













以上是关于kafka集群测试正常,但是Java连接kafka出现异常,急求大神解答!!!!!!!!!!!的主要内容,如果未能解决你的问题,请参考以下文章

动态从zookeeper读取kafka信息

当kafka集群其中一台宕机后,会怎么样?

Java客户端连接kafka集群报错

Kafka分布式安装及验证测试

kafka 单机/集群压力测试

Kafka 代理正常关闭,错误的元数据被传递到 Kafka 连接客户端