Apache Kafka LEADER_NOT_AVAILABLE
Posted
技术标签:
【中文标题】Apache Kafka LEADER_NOT_AVAILABLE【英文标题】: 【发布时间】:2017-02-04 17:59:40 【问题描述】:我遇到了一个我不理解的 apache Kafka 问题。我在代理中订阅了一个名为 "topic-received" 的主题。这是代码:
protected String readResponse(final String idMessage)
if (props != null)
kafkaClient = new KafkaConsumer<>(props);
logger.debug("Subscribed to topic-received");
kafkaClient.subscribe(Arrays.asList("topic-received"));
logger.debug("Waiting for reading : topic-received");
ConsumerRecords<String, String> records =
kafkaClient.poll(kafkaConfig.getRead_timeout());
if (records != null)
for (ConsumerRecord<String, String> record : records)
logger.debug("Resultado devuelto : "+record.value());
return record.value();
return null;
发生这种情况时,我从另一点向“已收到主题”发送了一条消息。代码如下:
private void sendMessageToKafkaBroker(String idTopic, String value)
Producer<String, String> producer = null;
try
producer = new KafkaProducer<String, String>(mapProperties());
ProducerRecord<String, String> producerRecord = new
ProducerRecord<String, String>("topic-received", value);
producer.send(producerRecord);
logger.info("Sended value "+value+" to topic-received");
catch (ExceptionInInitializerError eix)
eix.printStackTrace();
catch (KafkaException ke)
ke.printStackTrace();
finally
if (producer != null)
producer.close();
我第一次尝试,主题为“topic-received”,我收到这样的警告
"WARN 13164 --- [nio-8085-exec-3] org.apache.kafka.clients.NetworkClient :
Error while fetching metadata with correlation id 1 : topic-
received=LEADER_NOT_AVAILABLE"
但是,如果我再试一次,对于“已收到主题”这个主题,工作正常,并且不会出现任何警告。无论如何,这对我来说没用,因为我每次都必须收听一个主题并发送到一个新主题(由字符串标识符引用,例如:.. 12Erw45-2345Saf-234DASDFasd)
在 google 中寻找 LEADER_NOT_AVAILABLE ,有些人谈论将下一行添加到 server.properties :
host.name=127.0.0.1
advertised.port=9092
advertised.host.name=127.0.0.1
但这对我不起作用(不知道为什么)。
我已尝试使用以下代码在所有此过程之前创建主题:
private void createTopic(String idTopic)
String zookeeperConnect = "localhost:2181";
ZkClient zkClient = new ZkClient(zookeeperConnect,10000,10000,
ZKStringSerializer$.MODULE$);
ZkUtils zkUtils = new ZkUtils(zkClient, new
ZkConnection(zookeeperConnect),false);
if(!AdminUtils.topicExists(zkUtils,idTopic))
AdminUtils.createTopic(zkUtils, idTopic, 2, 1, new Properties(),
null);
logger.debug("Created topic "+idTopic+" by super user");
else
logger.debug("topic "+idTopic+" already exists");
没有错误,但它仍然会一直监听直到超时。
我已经查看了代理的属性以检查是否有任何帮助,但我没有发现任何足够清楚的信息。我用来阅读的道具是:
props = new Properties();
props.put("bootstrap.servers", kafkaConfig.getBootstrap_servers());
props.put("key.deserializer", kafkaConfig.getKey_deserializer());
props.put("value.deserializer", kafkaConfig.getValue_deserializer());
props.put("key.serializer", kafkaConfig.getKey_serializer());
props.put("value.serializer", kafkaConfig.getValue_serializer());
props.put("group.id",kafkaConfig.getGroupId());
and , 用于发送 ...
Properties props = new Properties();
props.put("bootstrap.servers", kafkaConfig.getHost() + ":" +
kafkaConfig.getPort());
props.put("group.id", kafkaConfig.getGroup_id());
props.put("enable.auto.commit", kafkaConfig.getEnable_auto_commit());
props.put("auto.commit.interval.ms",
kafkaConfig.getAuto_commit_interval_ms());
props.put("session.timeout.ms", kafkaConfig.getSession_timeout_ms());
props.put("key.deserializer", kafkaConfig.getKey_deserializer());
props.put("value.deserializer", kafkaConfig.getValue_deserializer());
props.put("key.serializer", kafkaConfig.getKey_serializer());
props.put("value.serializer", kafkaConfig.getValue_serializer());
有什么线索吗?为什么,我必须使用来自代理和主题的消息的唯一方法是在出错后重复请求?
提前致谢
【问题讨论】:
使用来自 windows 控制台的消息,我得到一个 kafka.common.NotLeaderForPartitionException 我不确定,确切的问题是什么......此外,我想这不是错误,而只是一个警告:WARN 13164
,对吧?
是的。这不完全是一个错误。这是一个警告,但消费者没有阅读任何消息就站着。另一方面,当我尝试从控制台使用代理时,我得到 kafka.common.NotLeaderForPartitionException 。如果我使用了一个很好的主题,我没有错误,也不会出现这个警告。
正如警告所暗示的,Kafka 还没有选举出领导者。这就是为什么你说一个成熟的话题没有错误。创建主题,然后使用带有 --describe 选项的工具 kafka-topics.sh。如果您在输出中看到领导者,您将不会收到任何警告。
【参考方案1】:
当尝试向不存在的主题生成消息时会发生这种情况
请注意:在某些 Kafka 安装中,框架可以在主题不存在时自动创建主题,这解释了为什么您在一开始只看到一次问题。
【讨论】:
【参考方案2】:当您的主题名称不存在时会出现此错误。
要列出所有主题,请执行以下操作:
kafka-topics --list --zookeper localhost:2181
【讨论】:
主题确实存在于我的案例中以上是关于Apache Kafka LEADER_NOT_AVAILABLE的主要内容,如果未能解决你的问题,请参考以下文章
Flink实战系列Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/
org.apache.kafka.common.KafkaException: 无法构造 kafka 消费者
技术实战 |《Apache Kafka实战》作者胡夕:Apache Kafka监控与调优
Kafka Producer - org.apache.kafka.common.serialization.StringSerializer 找不到