无法在kafka-storm中将偏移数据写入zookeeper

Posted

技术标签:

【中文标题】无法在kafka-storm中将偏移数据写入zookeeper【英文标题】:Failing to write offset data to zookeeper in kafka-storm 【发布时间】:2014-08-15 23:32:09 【问题描述】:

我正在设置一个风暴集群来计算实时趋势和其他统计数据,但是通过允许 kafka-spout 上次读取的偏移量(源kafka-spout 的代码来自 https://github.com/apache/incubator-storm/tree/master/external/storm-kafka) 被记住。我以这种方式开始我的kafka-spout

BrokerHosts zkHost = new ZkHosts("localhost:2181");
SpoutConfig kafkaConfig = new SpoutConfig(zkHost, "test", "", "test");
kafkaConfig.forceFromStart = false;
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("test" + "spout", kafkaSpout, ESConfig.spoutParallelism);

默认设置应该是这样做的,但我认为在我的情况下并没有这样做,每次我启动我的项目时,PartitionManager 都会尝试查找带有偏移量的文件,然后什么也找不到:

2014-06-25 11:57:08 INFO  PartitionManager:73 - Read partition information from: /storm/partition_1  --> null
2014-06-25 11:57:08 INFO  PartitionManager:86 - No partition information found, using configuration to determine offset

然后它从最新的可能偏移量开始读取。如果我的项目永远不会失败,那没关系,但不是我想要的。

我还进一步研究了PartitionManager 类,它使用Zkstate 类来编写偏移量,来自这段代码 sn-p:

分区管理器

public void commit() 
    long lastCompletedOffset = lastCompletedOffset();
    if (_committedTo != lastCompletedOffset) 
        LOG.debug("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
        Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
                .put("topology", ImmutableMap.of("id", _topologyInstanceId,
                        "name", _stormConf.get(Config.TOPOLOGY_NAME)))
                .put("offset", lastCompletedOffset)
                .put("partition", _partition.partition)
                .put("broker", ImmutableMap.of("host", _partition.host.host,
                        "port", _partition.host.port))
                .put("topic", _spoutConfig.topic).build();
        _state.writeJSON(committedPath(), data);

        _committedTo = lastCompletedOffset;
        LOG.debug("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
     else 
        LOG.debug("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
    

ZkState

public void writeBytes(String path, byte[] bytes) 
    try 
        if (_curator.checkExists().forPath(path) == null) 
            _curator.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath(path, bytes);
         else 
            _curator.setData().forPath(path, bytes);
        
     catch (Exception e) 
        throw new RuntimeException(e);
    

我可以看到,对于第一条消息,writeBytes 方法进入 if 块并尝试创建路径,然后对于第二条消息,它进入 else 块,这似乎没问题.但是当我再次启动项目时,会出现与上面提到的相同的消息。找不到partition information

【问题讨论】:

你好 Juto,我遇到了问题...你解决了这个问题吗?谢谢你,我现在等你 嗨@kaitian,我离开了我做这个项目的公司,因此我无法再访问代码,我从来没有解决这个问题。 :( Anthony 的回答很有效,原因很明显,因为在本地模式下,zookeeper 与 kafka 使用的不同! 【参考方案1】:

我认为你遇到了这个错误:

https://community.hortonworks.com/questions/66524/closedchannelexception-kafka-spout-cannot-read-kaf.html

上面同事的评论解决了我的问题。我添加了一些较新的库。

【讨论】:

【参考方案2】:

我遇到了同样的问题。原来我是在本地模式下运行的,它使用内存中的 zookeeper,而不是 Kafka 正在使用的 zookeeper。

为了确保 KafkaSpout 不使用 Storm 的 ZooKeeper 来存储偏移量的ZkState,除了ZkHosts 之外,您还需要设置SpoutConfig.zkServersSpoutConfig.zkPortSpoutConfig.zkRoot。例如

import org.apache.zookeeper.client.ConnectStringParser;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;
import storm.kafka.KeyValueSchemeAsMultiScheme;

...

    final ConnectStringParser connectStringParser = new ConnectStringParser(zkConnectStr);
    final List<InetSocketAddress> serverInetAddresses = connectStringParser.getServerAddresses();
    final List<String> serverAddresses = new ArrayList<>(serverInetAddresses.size());
    final Integer zkPort = serverInetAddresses.get(0).getPort();
    for (InetSocketAddress serverInetAddress : serverInetAddresses) 
        serverAddresses.add(serverInetAddress.getHostName());
    

    final ZkHosts zkHosts = new ZkHosts(zkConnectStr);
    zkHosts.brokerZkPath = kafkaZnode + zkHosts.brokerZkPath;

    final SpoutConfig spoutConfig = new SpoutConfig(zkHosts, inputTopic, kafkaZnode, kafkaConsumerGroup);
    spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(inputKafkaKeyValueScheme);

    spoutConfig.zkServers = serverAddresses;
    spoutConfig.zkPort = zkPort;
    spoutConfig.zkRoot = kafkaZnode;

【讨论】:

只是为了让答案更容易掌握:当您准备好在远程服务器上部署的工作拓扑时,添加最后 3 行以使其在本地模式下使用时连接到远程 Zookeeper

以上是关于无法在kafka-storm中将偏移数据写入zookeeper的主要内容,如果未能解决你的问题,请参考以下文章

无法在 iOS 中将数据写入 .plist

在 SSIS 中将数据写入 Excel 文件时出现无法解释的错误

在 R 中将哪个时间序列类用于财务数据?

在 C++ 中将数据写入由 CFileDialog 创建的 .txt 文件

贝加莱PLC。写入后计算新偏移量或如何将数据写入新行

在 fortran 中将写入附加到 hdf5 文件