如何从Combiner/Reducer/Aggregator 函数返回具有多个字段的元组?

Posted

技术标签:

【中文标题】如何从Combiner/Reducer/Aggregator 函数返回具有多个字段的元组?【英文标题】:How can I return a tuple with multiple fields from Combiner/Reducer/Aggregator function? 【发布时间】:2019-04-04 09:58:48 【问题描述】:

Here Storm 文档指出: CombinerAggregator 返回一个带有单个字段的元组作为输出。

如何从Combiner函数返回一个包含多个字段的元组?

我正在创建一个聚合函数,并希望从输入元组聚合两个或多个值并将这两个或多个字段作为输出发送。

我还想在输出中包含输入元组的一些字段。 如何使用Combiner Function获得所需的输出?

向Combiner Aggregator 函数输入元组:

("a", "b", "c" , "d")

所需的输出元组:

("a", "b", "newValue1", "newValue2", "newValue3")

过去,我尝试在CombinerAggregator 的init() 方法中从元组的字段中创建一个模型,并将其从CombinerAggregator 作为输出返回。但我觉得这不是正确的解决方案。 chainedAgg() 函数是否适用于这种情况?

任何帮助将不胜感激。

【问题讨论】:

【参考方案1】:

我想你可能想要使用更通用的Aggregator 接口。

来自您发布的链接:

执行聚合最通用的接口是Aggregator,它看起来像这样:

public interface Aggregator<T> extends Operation 
    T init(Object batchId, TridentCollector collector);
    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
    void complete(T state, TridentCollector collector);

聚合器可以发出任意数量的具有任意数量字段的元组。

【讨论】:

感谢您的快速回答。我试过了。但我有疑问。它是否取决于拓扑中使用的 Spout 类型?目前,拓扑使用 IRichSpout。 我不这么认为,但我不太熟悉将 Trident 与常规非批处理 spout 一起使用。 使用聚合函数和 IRichSpout 出现的问题是:每次从 kafka 主题中消费一个新事件而不是添加它时,聚合值都会重置。我真的不确定这个问题是出在 spout(取决于从 spout 发送元组的方式)还是其他问题。 考虑在issues.apache.org/jira 提出问题。如果你能做一个小的复现来显示这个问题(例如 github 上的一个小项目或类似的),我们可能会弄清楚它是一个错误还是配置错误。

以上是关于如何从Combiner/Reducer/Aggregator 函数返回具有多个字段的元组?的主要内容,如果未能解决你的问题,请参考以下文章

如何将数据从回收器适配器发送到片段 |如何从 recyclerview 适配器调用片段函数

如何从 Firebase 获取所有设备令牌?

如何直接从类调用从接口继承的方法?

如何从服务器获取和设置 android 中的 API(从服务器获取 int 值)?如何绑定和实现这个

如何从Mac从android studio中的fabric注销? [复制]

如何从设备中获取 PDF 文件以便能够从我的应用程序中上传?