风暴集群重复元组

Posted

技术标签:

【中文标题】风暴集群重复元组【英文标题】:Storm Cluster Duplicate Tuples 【发布时间】:2014-05-20 15:37:07 【问题描述】:

目前我正在开展一个项目,我在其中设置了一个跨四台 Unix 主机的 Storm 集群。

拓扑本身如下:

    JMS Spout 侦听 MQ 以获取新消息 JMS Spout 解析然后将结果发送到 Esper Bolt Esper Bolt 然后处理该事件并将结果发送到 JMS Bolt JMS Bolt 然后将消息发布回 MQ 上的不同主题

我意识到 Storm 是一个“至少一次”的框架。但是,如果我收到 5 个事件并将它们传递给 Esper Bolt 进行计数,那么由于某种原因,我会在 JMS Bolt 中收到 5 个计数结果(所有值都相同)。

理想情况下,我想接收一个结果输出,有什么方法可以告诉 Storm 忽略重复的元组吗?

我认为这与我设置的并行性有关,因为当我只有一个线程时它按预期工作:

 TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(JMS_DATA_SPOUT, new JMSDataSpout(),2).setNumTasks(2);
    builder.setBolt("esperBolt", new EsperBolt.Builder().build(),6).setNumTasks(6)
            .fieldsGrouping(JMS_DATA_SPOUT,new Fields("eventGrouping"));
    builder.setBolt("jmsBolt", new JMSBolt(),2).setNumTasks(2).fieldsGrouping("esperBolt", new Fields("eventName"));

我还看到了 Trident 的“exactly-once”语义。不过,我并不完全相信这会解决这个问题。

【问题讨论】:

在三叉戟之前,这个问题通常由事务拓扑解决,但由于后者已被弃用,我认为三叉戟是要走的路。看看relevant section of their doc。 你能发布你的 Esper Bolt 的代码吗? 我同意 Chris 的请求 - 我们需要 Esper 粗体的代码。在 Esper 中,像 select count(x) from A 这样的语句将为发送到引擎的每个 A 生成一个输出。如果您发送了 5 个事件,之后只想看到一个结果,则需要定义此“边界”(例如通过发送单独的事件)。 【参考方案1】:

如果您的 Esper Bolt 没有在其 execute() 方法的末尾显式 ack() 每个元组或使用 iBasicBolt 实现,那么它收到的每个元组最终将在超时后由您的源 JMS Spout 重播。

或者,如果您要求螺栓“仅处理唯一消息”,请考虑将此处理行为添加到您的 execute() 方法中。它可以首先检查本地 Guava 缓存中元组值的唯一性,然后进行相应的处理。

【讨论】:

以上是关于风暴集群重复元组的主要内容,如果未能解决你的问题,请参考以下文章

如何在风暴集群上运行拓扑?我看不到输出日志

广播风暴的成因以及解决办法有哪些?

杀死风暴拓扑后资源清理

阿帕奇水槽和阿帕奇风暴有啥区别?

如何提高节点下线速度或避免因节点掉线产生网络风暴?

如何提高节点下线速度或避免因节点掉线产生网络风暴?