Storm/Kafka - 无法获得 kafka 的偏移滞后

Posted

技术标签:

【中文标题】Storm/Kafka - 无法获得 kafka 的偏移滞后【英文标题】:Storm/Kafka - Unable to get offset lags for kafka 【发布时间】:2018-09-24 05:12:41 【问题描述】:

我正在运行一个 Storm 拓扑,它从 AWS 上的 Kafka 获取推文 具有 4 个节点的 Ubuntu Server 14.04 LTS 实例 - Nimbus、Supervisor、Kafka-Zookeeper 节点、Zookeeper(用于 Storm 集群)。我的 Storm UI 已启动并正在运行,我可以提交拓扑。我有两个经纪人,但我只使用 broker.id=0 一个。我在一个主题下有推文。我的 kafka 服务器也运行良好。

我以这种方式创建了 kafka-topic: bin/kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic twitter1

我感到困惑的是:

SpoutConfig kafkaConfig = new SpoutConfig(kafkaHosts, topicName+"-0", "/kafka", topicName+"-0");

我认为我的错误是从这一点开始的。完整代码为:

import org.apache.storm.tuple.Fields;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import java.util.Arrays;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.kafka.StringScheme;

public class TwitterTopology
    public static void main(String[] args) 
        String topicName = "twitter1";
        String topologyName = args[0];
        String kafkaIp = "xxx.31.xxx.207"; //hiding the IPs here. This is the IP for my kafka-zk node. Is this ok?
        String nimbusHost = "xxx.31.xxx.70";
        String kafkaHost = kafkaIp + ":9092";
        BrokerHosts kafkaHosts = new ZkHosts(kafkaHost);
        SpoutConfig kafkaConfig = new SpoutConfig(kafkaHosts, topicName, "/kafka", topicName);
        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("twitter-spout", kafkaSpout, 8);
        builder.setBolt("WordSplitterBolt", new JsonWordSplitterBolt(5)).shuffleGrouping("twitter-spout");
        builder.setBolt("IgnoreWordsBolt", new IgnoreWordsBolt()).shuffleGrouping("WordSplitterBolt");
        builder.setBolt("WordCounterBolt", new WordCounterBolt(5, 5 * 60, 50)).shuffleGrouping("IgnoreWordsBolt");
        Config config = new Config();
        config.setDebug(false);
        config.setMaxTaskParallelism(5);
        config.put(Config.NIMBUS_HOST, nimbusHost);
        config.put(Config.NIMBUS_THRIFT_PORT, 6627);
        config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
        config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(kafkaIp));
        try 
                config.setNumWorkers(20);
                config.setMaxSpoutPending(5000);
                StormSubmitter.submitTopology(topologyName, config, builder.createTopology()); 
             catch (Exception e) 
                    throw new IllegalStateException("Couldn't initialize the topology", e);
            
    

我在 Storm UI 中遇到了这个异常:

Unable to get offset lags for kafka. Reason: org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /brokers/topics/twitter1/partitions at org.apache.zookeeper.KeeperException.create(KeeperException.java:99) at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1590) at org.apache.curator.shaded.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:242) at org.apache.curator.shaded.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:231) at org.apache.curator.shaded.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64) at org.apache.curator.shaded.RetryLoop.callWithRetry(RetryLoop.java:100) at org.apache.curator.shaded.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:228) at org.apache.curator.shaded.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:219) at org.apache.curator.shaded.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:41) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getLeadersAndTopicPartitions(KafkaOffsetLagUtil.java:319) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:256) at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.main(KafkaOffsetLagUtil.java:124)

错误Unable to get offset lags for kafka 保持不变,而异常的另一部分根据我更改的zkroot 路径(SpoutConfig 中的第三个参数)而更改。我不知道如何准确地填写这些论点,让 Kafka 从我的主题中提取推文。

我使用此处提供的教程编写了提交拓扑的代码:http://stdatalabs.blogspot.ca/2016/10/real-time-stream-processing-using.html 我对 Maven 依赖项进行了许多更改。我的 pom.xml 包含 Storm-core、kafka 等的所有依赖项,最新版本在 maven repo 中可用。

【问题讨论】:

【参考方案1】:

zkHosts() 应该包含 zookeeper 的配置而不是 kafka。如果你的 zookeeper 和 kafka 在同一台服务器上。

尝试为 zookeeper(2181) 提供正确的端口

参考https://storm.apache.org/releases/1.2.3/storm-kafka.html

【讨论】:

以上是关于Storm/Kafka - 无法获得 kafka 的偏移滞后的主要内容,如果未能解决你的问题,请参考以下文章

使用 Kafka Spout 的 Apache Storm 给出错误:IllegalStateException

基于storm,kafka,mysql的实时统计系统

发送字节数组到storm kafka bolt

storm+kafka:WordCount程序

storm+kafka+redis的实时订单流

基于Storm流计算天猫双十一作战室项目实战(Storm Kafka HBase )