在Kafka阅读消息时重新平衡问题

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在Kafka阅读消息时重新平衡问题相关的知识,希望对你有一定的参考价值。

我正在尝试阅读有关Kafka主题的消息,但我无法阅读它。一段时间后该进程被杀死,无需读取任何消息。

这是我得到的重新平衡错误:

[2014-03-21 10:10:53,215] ERROR Error processing message, stopping consumer:  (kafka.consumer.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: topic-1395414642817-47bb4df2 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
    at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
    at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:752)
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:142)
    at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
    at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Consumed 0 messages

我试图运行ConsumerOffsetChecker,这是我得到的错误。我不知道如何解决这个问题。任何人,任何想法?

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:9092 --topic mytopic --group  topic_group
Group           Topic                          Pid Offset          logSize         Lag             Owner
Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:459)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
        at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
        at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        ... 16 more
答案

我最近遇到过类似的问题。您可以尝试将使用者配置rebalance.backoff.ms和zookeeper.session.timeout.ms增加到大约5-10秒。

第一个参数告诉kafka在重试重新平衡之前等待更多。第二个告诉kafka在尝试连接到zookeeper时更耐心。

其他配置选项可以在the official documentation上找到。

另一答案

这可能意味着代理连接到Zookeeper时没有正确创建这些节点。尝试使用时,/ consumer路径应该存在。

以下是一些调试路径:

你有没有创建任何主题?

如果是这样:

  1. 主题中有多少个分区?
  2. 您是否检查过zookeeper中是否正确填充了主题元数据?
  3. 我们能看到您的消费者配置吗?

如果不:

  1. 然后,您需要使用脚本$KAFKA_DIR/bin/kafka-create-topic.sh创建主题。在脚本中查看用法详细信息。
  2. 创建主题后,您需要创建一个具有之前未使用过的组ID的使用者,否则您将无法重新开始。
另一答案

kafka.tools.ConsumerOffsetChecker中存在一个错误。如果保留消耗的偏移信息的特定Zookeeper节点未退出,则该工具退出抛出该execption。

例如,假设您有一个使用者组“mygroup”和一个主题“topictest”。然后在Znode:/ consumers / mygroup / offsets / topictest / 2中维护分区2的偏移量。

如果在Znode中没有主题topictest的分区2的条目,那么消费者offsetchecker工具将在检查分区2的偏移时退出。基本上,在检查Znode / consumer / mygroup / offset的第一个分区“n”时它将失败Zookeeper上缺少/ topictest / n。

另一答案

可能您的代理处于脱机状态,并且他们无法连接到Zookeeper,您是否尝试运行$KAFKA_ROOT_DIR/bin路径中提供的控制台用户脚本来检查您是否能够使用特定主题。

另一答案

另一个问题可能是因为jar冲突。如果您具有存储在库文件夹中的不同版本的相同jar。可能会出现此问题。像scala-library,zkclient,zookeeper,kafka-client这样的jars不应该与不同的版本重复。

以上是关于在Kafka阅读消息时重新平衡问题的主要内容,如果未能解决你的问题,请参考以下文章

由于消费者速度较慢,Kafka 重新平衡主题中的数据

在Kafka重新平衡中是否撤销操作等消耗过程完成?

使用 Spring-kafka 在 GC/消费者重新平衡时清理 Kafka Metric 计量器

KafKa消费者组重平衡能避免吗

消费者再平衡如何在 Kafka 中工作?

Kafka 消费者意外地重新平衡