如何将来自 Trident/Storm 的值存储在列表中(使用 Java API)
Posted
技术标签:
【中文标题】如何将来自 Trident/Storm 的值存储在列表中(使用 Java API)【英文标题】:How to store values from Trident/Storm in a List (using the Java API) 【发布时间】:2016-01-20 00:01:07 【问题描述】:我正在尝试创建一些单元测试来验证我的 Trident 拓扑的某些部分是否正在执行它们应该执行的操作。
我希望能够检索运行拓扑后产生的所有值并将它们放入列表中,以便我可以“查看”它们并检查它们的条件。
FeederBatchSpout feederSpout = new FeederBatchSpout("some_time_field", "foo_id");
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
// Soo... how do I retrieve the "aggregated_foos" from here?
我正在以TrackedTopology
的身份运行拓扑(从another S.O. question 获得代码,感谢@brianghig 的提问和@Thomas Kielbus 的回复)
这就是我“启动”拓扑以及将样本值输入其中的方式:
TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology.build());
cluster.submitTopology("unit_tests", config, tracked.getTopology());
feederSpout.feed(new Values(MyUtils.makeSampleFoo(1));
feederSpout.feed(new Values(MyUtils.makeSampleFoo(2));
当我这样做时,我可以在日志消息中看到拓扑运行正确,并且值计算正确,但我想将结果“钓鱼”到List
(或任何结构,此时),所以我实际上可以在我的测试中添加一些 Asserts
。
我一直在尝试 [a s**ton] 各种不同的方法,但都没有奏效。
最新的想法是在聚合之后添加一个螺栓,以便它“持久”将我的值放入一个列表中:
您将在下面看到尝试遍历 aggregate
发出的所有元组并将它们放入我之前初始化的列表中的类:
class FieldFetcherStateUpdater extends BaseStateUpdater<FieldFetcherState>
final List<AggregatedFoo> results;
public FieldFetcherStateUpdater(List<AggregatedFoo> results)
this.results = results;
@Override
public void updateState(FieldFetcherState state, List<TridentTuple> tuples,
TridentCollector collector)
for (TridentTuple tuple : tuples)
results.add((AggregatedFoo) tuple.getValue(0));
所以现在代码如下所示:
// ...
List<AggregatedFoo> results = new ArrayList();
topology.newStream("spout1", feederSpout)
.groupBy(new Fields("some_time_field", "foo_id"))
.aggregate(new Fields("foo_id"), new FooAggregator(),
new Fields("aggregated_foos"))
.partitionPersist(new FieldFetcherFactory(),
new Fields("aggregated_foos"),
new FieldFetcherStateUpdater(results));
LOGGER.info("Done. Checkpoint results=", results);
但什么都没有...日志显示Done. Checkpoint results=[]
(空列表)
有没有办法得到它?我想它一定是可行的,但我一直无法想出办法......
任何提示或链接到页面或任何类似的东西都将受到赞赏。提前谢谢你。
【问题讨论】:
【参考方案1】:您需要使用静态成员变量 result
。如果您有多个并行任务正在运行(即parallelism_hint > 1
),您还需要synchronize
对result
的写入权限。
在您的情况下,result
将为空,因为 Storm 在内部创建了一个新的 bolt 实例(包括一个新的 ArrayList
实例)。使用静态变量可确保您访问正确的对象(因为您的 bolt 实例中只有一个)。
【讨论】:
有效!出于某种原因,我觉得这样做很肮脏......但它有效!耶!谢谢以上是关于如何将来自 Trident/Storm 的值存储在列表中(使用 Java API)的主要内容,如果未能解决你的问题,请参考以下文章
如何存储来自 sqlDataReader 的值?并进行比较?
ExtJS 4 - 如何加载带有来自表单的最新值的参数的网格存储?