在 Storm Trident 中发射到多个流

Posted

技术标签:

【中文标题】在 Storm Trident 中发射到多个流【英文标题】:Emit to multiple streams in Storm Trident 【发布时间】:2015-11-20 23:22:11 【问题描述】:

如何从 Storm Trident 中的同一个螺栓向多个流发射?

我有一个螺栓,它进行一些计算,根据结果我想将一些值传递给一个流,并将一些其他值传递给另一个流。

在 Storm(不是 Trident)中,我们可以通过以下方式实现:

将流分成多个流:

@Override
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) 
    outputFieldsDeclarer.declareStream("type1-stream", new Fields("type1"));
    outputFieldsDeclarer.declareStream("type2-stream", new Fields("type2"));
    outputFieldsDeclarer.declareStream("error-stream", new Fields("error"));

然后根据发现发出,例如:

collector.emit("type1-stream", new Values("type 1 data"));
collector.emit("type2-stream", new Values("type 2 data"));
collector.emit("error-stream", new Values("error data"));

然后通过监听预期的流来完成剩下的工作:

builder.setBolt("errorBolt", errorBolt).shuffleGrouping("errorBoltStream", "error-stream");
builder.setBolt("type1Bolt", type1Bolt).shuffleGrouping("type1BoltStream", "type1-stream");

那么如何使用 Storm Trident 实现相同的行为?

一个选项是为同一个流调用“每个”并运行相同的螺栓,并且仅根据我想要向该流发出的内容发出,或者另一个选项是发出键和值对并根据键过滤流(如 type1、type2、error 等)并再次创建多个流。但在我看来,它们都不是一个好的设计。实现它的最佳方法是什么?

【问题讨论】:

我也有同样的问题。有一个关于这个的问题:issues.apache.org/jira/browse/STORM-68 【参考方案1】:

AFAIK,你不能那样做。 要拆分流,您需要执行以下操作:

// main stream
Stream stream = topology.each(...)

// stream 01
Stream stream1 = stream.each(...)

// stream 02
Stream stream2 = stream.each(...)

【讨论】:

以上是关于在 Storm Trident 中发射到多个流的主要内容,如果未能解决你的问题,请参考以下文章

Storm-HBase Trident - 同时查询多个列

Trident中的DRPC实现

storm trident 消息成功处理

Storm Trident:如何使用 IPartitionedTridentSpout?

Trident中的过滤与函数的区别

Storm Trident示例ReducerAggregator