Storm Trident:如何使用 IPartitionedTridentSpout?
Posted
技术标签:
【中文标题】Storm Trident:如何使用 IPartitionedTridentSpout?【英文标题】:Storm Trident: How to use IPartitionedTridentSpout? 【发布时间】:2014-02-07 15:01:52 【问题描述】:在我们的系统中,我们有多个数据生成器在共享文件系统中创建文件内容,并在文件名中指示 DataSourceId
。需要有一个公平的调度机制来读取所有来源生成的文件,解析、展平和丰富(使用参考数据)文件中的数据记录,批处理丰富的记录并写入数据库。
我使用IPartitionedTridentSpout
。拓扑如下所示:
TransactionalTridentEsrSpout spout
= new TransactionalTridentEsrSpout(NUM_OF_PARTITIONS);
TridentTopology topology = new TridentTopology();
topology.newStream("FileHandlerSpout", spout)
.each(new Fields("filename", "esr"), new Utils.PrintFilter())
.parallelismHint(NUM_OF_PARTITIONS)
.shuffle()
.each(new Fields("filename", "record"), new RecordFlattenerAndEnricher(), new elds("record-enriched"))
.each(new Fields("filename", "record-enriched"), new Utils.PrintFilter())
.project(new Fields(record-enriched")) // pass only required
.parallelismHint(PARALLELISM_HINT_FOR_ESR_FLATTENER_ENRICHER)
.shuffle()
.aggregate(new Fields("record-enriched"), new BlockWriterToDb(), new Fields("something"))
.each(new Fields("something"), new Utils.PrintFilter())
.parallelismHint(PARALLELISM_HINT_FOR_GP_WRITER);
由于数据文件非常大(通常是 100 万条记录),我小批量读取 10K 条记录。对于Coordinator
生成的每个transactionId
,我的Emitter
会在其分区中发出当前/下一个文件的下10K 记录。最终的BlockWriter
会将丰富的记录聚合到缓冲区中,并在“完成”方法调用时将缓冲区写入数据库。
拓扑工作正常,但我有以下问题:
影响Emitters
数量的ParttionedTridentSpout
的parallelismHint
设置为分区数。接下来两层的parallelismHint
(FlattenerAndEnricher
和BlockWriterToDb
)需要设置为更高的值,因为我们有很多工作要做。由于这里不需要groupBy
,所以我在这里的所有阶段之间使用shuffle()
。当一个特定的下游螺栓死机时,Trident 应该调用 Emitter
并使用适当的旧元数据要求它重新发射。但由于发生了洗牌,作为 Emitter
发出的一部分的特定记录将落在多个下游螺栓中。那么,Trident 如何调用适当的发射器进行重新发射,以便重新发射完全相同的记录。即使 Trident 调用了适当的发射器,Emitter
也会重新发射整个 10K 批次,其中一些记录只是失败了。 Storm 是如何处理整个序列的,我们如何设计应用程序逻辑来处理恰好一次语义的容错。
【问题讨论】:
【参考方案1】:使用 Trident 时,整批成功,或者整批失败。 当批处理失败时,spout 应该(自动)重播整个批处理,并且您将无法在发出时在其记录中进行选择。
要获得一次性语义,您的下游逻辑/数据库更新应该忽略重放的项目(跟踪成功更新项目的批次 ID),或者是幂等的。
【讨论】:
以上是关于Storm Trident:如何使用 IPartitionedTridentSpout?的主要内容,如果未能解决你的问题,请参考以下文章
如何将来自 Trident/Storm 的值存储在列表中(使用 Java API)