在 Kafka 中读取消息时的重新平衡问题

Posted

技术标签:

【中文标题】在 Kafka 中读取消息时的重新平衡问题【英文标题】:Rebalancing issue while reading messages in Kafka 【发布时间】:2014-04-29 02:15:53 【问题描述】:

我正在尝试阅读有关 Kafka 主题的消息,但无法阅读。该进程在一段时间后被杀死,没有读取任何消息。

这是我得到的重新平衡错误:

[2014-03-21 10:10:53,215] ERROR Error processing message, stopping consumer:  (kafka.consumer.ConsoleConsumer$)
kafka.common.ConsumerRebalanceFailedException: topic-1395414642817-47bb4df2 can't rebalance after 4 retries
    at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:428)
    at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:718)
    at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:752)
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:142)
    at kafka.consumer.ConsoleConsumer$.main(ConsoleConsumer.scala:196)
    at kafka.consumer.ConsoleConsumer.main(ConsoleConsumer.scala)
Consumed 0 messages

我尝试运行ConsumerOffsetChecker,这是我得到的错误。我不知道如何解决这个问题。任何人,有什么想法吗?

./kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:9092 --topic mytopic --group  topic_group
Group           Topic                          Pid Offset          logSize         Lag             Owner
Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:459)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
        at kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927)
        at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
        at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
        at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
        ... 16 more

【问题讨论】:

鉴于异常中的消息,在我看来,Kafka 消费者能够连接到 ZK,但在其中找不到 /consumers 路径,这听起来像是 ZK 数据库已损坏 我有时也会遇到这个错误。但是,通过控制台消费者脚本连接如下 user2720864 可以正常工作。 Kafka-users 列表中的这个帖子讨论了这个问题mail-archives.apache.org/mod_mbox/kafka-users/201312.mbox/… 问题解决了吗?如果是,怎么做? 【参考方案1】:

另一个问题可能是由于 jar 冲突。如果您在库文件夹中存储了具有不同版本的相同 jar。可能会出现此问题。 scala-library、zkclient、zookeeper、kafka-client 之类的 jar 不应与不同版本重复。

【讨论】:

【参考方案2】:

kafka.tools.ConsumerOffsetChecker 中有一个错误。如果持有消费偏移信息的特定 Zookeeper 节点没有退出,则工具退出并抛出执行。

例如,假设您有一个消费者组“mygroup”和一个主题“topictest”。然后在 Znode 中维护分区 2 的偏移量: /consumers/mygroup/offsets/topictest/2.

如果Znode中没有主题topictest的partition 2的入口,那么consumer offsetchecker工具会在检查partition 2的offset时退出。 基本上,它会在检查 Zookeeper 上缺少 Znode /consumers/mygroup/offsets/topictest/n 的第一个分区“n”时失败。

【讨论】:

在issues.apache.org/jira/browse/ZOOKEEPER 或issues.apache.org/jira/browse/KAFKA 上有没有关于它的jira 票? @Filippo 我不知道。但该工具可以轻松修复。【参考方案3】:

我最近遇到了类似的问题。您可以尝试将消费者配置 rebalance.backoff.mszookeeper.session.timeout.ms 增加到 5-10 秒左右。

第一个参数告诉 kafka 在重试重新平衡之前等待更多。 第二个告诉kafka在尝试连接zookeeper时要更有耐心。

其他配置选项可以在the official documentation找到。

【讨论】:

对不起,我误会了。首先,我尝试在服务器上更改这些选项,但什么也没做。然后我在消费者处改变了它们。这允许我连接,但现在我得到一个 java.nio.channels.ClosedByInterruptException。在grokbase.com/t/kafka/users/141ej00nf9/… 列表中,他们指向 Wiki,说 GC 使用可能是罪魁祸首。 cwiki.apache.org/confluence/display/KAFKA/…?我不相信在我的情况下,因为应用程序才刚刚开始: @MarkButler 当有其他正在运行的消费者时,我在启动时看到了这些错误。你有没有在 ZK 中验证过再平衡是可以的?它在 zk 路径 /consumers/[group_name]/owners/[topic name] 我尝试了上述步骤,但没有任何改变,我仍然看到同样的错误。【参考方案4】:

可能您的代理处于离线状态,并且无法连接到 Zookeeper,您是否尝试运行 $KAFKA_ROOT_DIR/bin 路径中可用的控制台消费者脚本来检查您是否能够从特定主题消费。

【讨论】:

【参考方案5】:

这可能意味着代理在连接到 Zookeeper 时没有正确创建这些节点。 /consumer 路径应该在您尝试消费时存在。

这里有几个调试路径:

您是否创建了任何主题?

如果是这样:

    主题中有多少个分区? 您是否检查过 Zookeeper 中的主题元数据是否正确填充? 我们可以查看您的使用者配置吗?

如果没有:

    然后您需要使用脚本$KAFKA_DIR/bin/kafka-create-topic.sh 创建一个主题。查看脚本内部的使用详情。 创建主题后,需要创建一个之前没有使用过的组ID的消费者,否则无法重新开始。

【讨论】:

以上是关于在 Kafka 中读取消息时的重新平衡问题的主要内容,如果未能解决你的问题,请参考以下文章

Kafka 消费者意外地重新平衡

由于消费者速度较慢,Kafka 重新平衡主题中的数据

在Kafka重新平衡中是否撤销操作等消耗过程完成?

Jet Kafka负载平衡

KafKa消费者组重平衡能避免吗

kafka 删除topic时的隐患