Storm Trident:如何使用 IPartitionedTridentSpout?

Posted

技术标签:

【中文标题】Storm Trident:如何使用 IPartitionedTridentSpout?【英文标题】:Storm Trident: How to use IPartitionedTridentSpout? 【发布时间】:2014-02-07 15:01:52 【问题描述】:

在我们的系统中,我们有多个数据生成器在共享文件系统中创建文件内容,并在文件名中指示 DataSourceId。需要有一个公平的调度机制来读取所有来源生成的文件,解析、展平和丰富(使用参考数据)文件中的数据记录,批处理丰富的记录并写入数据库。

我使用IPartitionedTridentSpout。拓扑如下所示:

TransactionalTridentEsrSpout spout
    = new TransactionalTridentEsrSpout(NUM_OF_PARTITIONS);

TridentTopology topology = new TridentTopology();
topology.newStream("FileHandlerSpout", spout)
         .each(new Fields("filename", "esr"), new Utils.PrintFilter())
         .parallelismHint(NUM_OF_PARTITIONS)
         .shuffle()
         .each(new Fields("filename", "record"), new RecordFlattenerAndEnricher(), new elds("record-enriched"))
         .each(new Fields("filename", "record-enriched"), new Utils.PrintFilter())
         .project(new Fields(record-enriched")) // pass only required 
         .parallelismHint(PARALLELISM_HINT_FOR_ESR_FLATTENER_ENRICHER)
         .shuffle()
         .aggregate(new Fields("record-enriched"), new BlockWriterToDb(), new Fields("something"))
         .each(new Fields("something"), new Utils.PrintFilter())
         .parallelismHint(PARALLELISM_HINT_FOR_GP_WRITER);

由于数据文件非常大(通常是 100 万条记录),我小批量读取 10K 条记录。对于Coordinator 生成的每个transactionId,我的Emitter 会在其分区中发出当前/下一个文件的下10K 记录。最终的BlockWriter 会将丰富的记录聚合到缓冲区中,并在“完成”方法调用时将缓冲区写入数据库。

拓扑工作正常,但我有以下问题:

影响Emitters数量的ParttionedTridentSpoutparallelismHint设置为分区数。接下来两层的parallelismHintFlattenerAndEnricherBlockWriterToDb)需要设置为更高的值,因为我们有很多工作要做。由于这里不需要groupBy,所以我在这里的所有阶段之间使用shuffle()。当一个特定的下游螺栓死机时,Trident 应该调用 Emitter 并使用适当的旧元数据要求它重新发射。但由于发生了洗牌,作为 Emitter 发出的一部分的特定记录将落在多个下游螺栓中。那么,Trident 如何调用适当的发射器进行重新发射,以便重新发射完全相同的记录。即使 Trident 调用了适当的发射器,Emitter 也会重新发射整个 10K 批次,其中一些记录只是失败了。 Storm 是如何处理整个序列的,我们如何设计应用程序逻辑来处理恰好一次语义的容错。

【问题讨论】:

【参考方案1】:

使用 Trident 时,整批成功,或者整批失败。 当批处理失败时,spout 应该(自动)重播整个批处理,并且您将无法在发出时在其记录中进行选择。

要获得一次性语义,您的下游逻辑/数据库更新应该忽略重放的项目(跟踪成功更新项目的批次 ID),或者是幂等的。

【讨论】:

以上是关于Storm Trident:如何使用 IPartitionedTridentSpout?的主要内容,如果未能解决你的问题,请参考以下文章

如何将来自 Trident/Storm 的值存储在列表中(使用 Java API)

storm trident 如何标记一个batch被处理——coordinator spout

Storm Trident API总结-2

storm trident 的介绍与使用

在 Storm Trident 中发射到多个流

storm trident function函数