如何将来自 Trident/Storm 的值存储在列表中(使用 Java API)

Posted

技术标签:

【中文标题】如何将来自 Trident/Storm 的值存储在列表中(使用 Java API)【英文标题】:How to store values from Trident/Storm in a List (using the Java API) 【发布时间】:2016-01-20 00:01:07 【问题描述】:

我正在尝试创建一些单元测试来验证我的 Trident 拓扑的某些部分是否正在执行它们应该执行的操作。

我希望能够检索运行拓扑后产生的所有值并将它们放入列表中,以便我可以“查看”它们并检查它们的条件。

   FeederBatchSpout feederSpout = new FeederBatchSpout("some_time_field", "foo_id");
   TridentTopology topology = new TridentTopology();
   topology.newStream("spout1", feederSpout)
    .groupBy(new Fields("some_time_field", "foo_id"))
    .aggregate(new Fields("foo_id"), new FooAggregator(),
               new Fields("aggregated_foos"))
    // Soo... how do I retrieve the "aggregated_foos" from here?

我正在以TrackedTopology 的身份运行拓扑(从another S.O. question 获得代码,感谢@brianghig 的提问和@Thomas Kielbus 的回复)

这就是我“启动”拓扑以及将样本值输入其中的方式:

TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology.build());
cluster.submitTopology("unit_tests", config, tracked.getTopology());

feederSpout.feed(new Values(MyUtils.makeSampleFoo(1));
feederSpout.feed(new Values(MyUtils.makeSampleFoo(2));

当我这样做时,我可以在日志消息中看到拓扑运行正确,并且值计算正确,但我想将结果“钓鱼”到List(或任何结构,此时),所以我实际上可以在我的测试中添加一些 Asserts

我一直在尝试 [a s**ton] 各种不同的方法,但都没有奏效。

最新的想法是在聚合之后添加一个螺栓,以便它“持久”将我的值放入一个列表中:

您将在下面看到尝试遍历 aggregate 发出的所有元组并将它们放入我之前初始化的列表中的类:

class FieldFetcherStateUpdater extends BaseStateUpdater<FieldFetcherState> 
    final List<AggregatedFoo> results;

    public FieldFetcherStateUpdater(List<AggregatedFoo> results) 
        this.results = results;
    

    @Override
    public void updateState(FieldFetcherState state, List<TridentTuple> tuples,
                            TridentCollector collector) 
        for (TridentTuple tuple : tuples) 
            results.add((AggregatedFoo) tuple.getValue(0));
        
    

所以现在代码如下所示:

// ...
List<AggregatedFoo> results = new ArrayList();
topology.newStream("spout1", feederSpout)
    .groupBy(new Fields("some_time_field", "foo_id"))
    .aggregate(new Fields("foo_id"), new FooAggregator(),
               new Fields("aggregated_foos"))
    .partitionPersist(new FieldFetcherFactory(),
                        new Fields("aggregated_foos"),
                        new FieldFetcherStateUpdater(results));

     LOGGER.info("Done. Checkpoint results=", results);

但什么都没有...日志显示Done. Checkpoint results=[](空列表)

有没有办法得到它?我想它一定是可行的,但我一直无法想出办法......

任何提示或链接到页面或任何类似的东西都将受到赞赏。提前谢谢你。

【问题讨论】:

【参考方案1】:

您需要使用静态成员变量 result。如果您有多个并行任务正在运行(即parallelism_hint &gt; 1),您还需要synchronizeresult 的写入权限。

在您的情况下,result 将为空,因为 Storm 在内部创建了一个新的 bolt 实例(包括一个新的 ArrayList 实例)。使用静态变量可确保您访问正确的对象(因为您的 bolt 实例中只有一个)。

【讨论】:

有效!出于某种原因,我觉得这样做很肮脏......但它有效!耶!谢谢

以上是关于如何将来自 Trident/Storm 的值存储在列表中(使用 Java API)的主要内容,如果未能解决你的问题,请参考以下文章

如何存储来自 sqlDataReader 的值?并进行比较?

如何使用二维数组计算和存储来自其他数组的值的频率?

ExtJS 4 - 如何加载带有来自表单的最新值的参数的网格存储?

如何将来自不同查询的值(计数)组合成一个查询

如何使用 jquery 创建具有来自动态创建的表单字段的值的多维数组?

SQL动态SELECT语句来自存储在表中的值