如何在 Apache Beam/Google 数据流中将大窗口缩减为小窗口?

Posted

技术标签:

【中文标题】如何在 Apache Beam/Google 数据流中将大窗口缩减为小窗口?【英文标题】:How do you reduce large windows down to smaller windows in Apache beam/ Google dataflow? 【发布时间】:2019-08-17 01:51:51 【问题描述】:

我遇到了一个问题,即我的管道在操作大型数据集时不断失败。除了有关在重新洗牌期间发生的处理暂停的警告之外,工作人员日志不显示任何错误。我的怀疑是我通过超出分配的内存来杀死工人,因为唯一暗示正在发生的事情是我可以看到工人在日志中旋转但随后什么也不做。最终,如果我等待足够长的时间或终止管道,它们就会失败。

我想将我的元素减少到几个同时运行的组,以便插入到 elasticsearch 中。因此,例如,从 40 名工作人员进行处理变为只有 7 名工作人员在 ES 中进行批量插入。

我在处理和弹性搜索插入之间设置了窗口。我有日志记录,我可以看到,尽管我使用AfterCount 进行了窗口化,但窗口大小似乎在很大程度上不受限制。 IE。我将窗口大小设置为 1000,我得到一组 12k。我认为问题在于 Apache Beam 在捆绑包上运行,并且仅在捆绑包被处理后触发,并且我的一个转换可能会为集合生成任意数量的输出元素。

完成这项任务的预期方法是什么?

形象化我想要发生的事情:

1000 个项目/50 个组窗口 -> 输出 500,000 多个文档/窗口 -> 插入 7 个工作人员,每批 2k 个文档

当前的管道流程(我已经尝试了很多变体):

Read from datastore
| window by timestamp (5 seconds) with early trigger of 100 elements
| group by random int between 0 and 50 (need groups for batch fetch call in processing)
| fetch from service in batch and process with output documents
| window by timestamp (10 seconds) with early trigger of 1000 documents
| group by random int between 0 and X
| insert into ES

我尝试了各种 X 值。较低的值会导致较高的插入吞吐量,但在处理大量数据时,管道在插入步骤中会失败。现在尝试在 X=80 的情况下运行,我看到吞吐量适中,但出现了一些超时,并且通常需要几秒钟或更短时间的批处理调用现在需要 15 秒或更长时间才能完成。

向插入添加更多工作人员似乎可以解决工作人员根本无法进行任何插入的问题,但大量的批处理请求效率非常低,而且它们最终需要更长的时间才能完成,并且有超时和集群过载的风险.

为了更好地表达事情,此时我只是在尝试不同的参数,当然必须有一种方法来设计管道,因此无论数据或窗口大小如何,这都不是问题。

【问题讨论】:

太好了,以经典的 *** 方式,我发布然后意识到在过去 10 小时内,我正在部署的批处理管道完全缺少第二个窗口,因为我忘记更新代码以传递所需的参数将其添加到管道中(不能在流管道中使用第二个窗口)。将对此进行更新... 好的,重新添加了在 ES 之前出现的窗口,并且在它之前有一个随机播放,并且......它只是在插入步骤停止工作,没有错误。日志显示工作人员正在运转,现在只有一个时间上限,什么都没有。工人规模缩小到 1 在处理完这个之后,我能够确定如果处理组有太多与之关联的数据,那么工作人员会神秘地失败。解决方案是创建更多组,但如果您想限制并发性,这将成为一个问题,并且需要将批处理管道中的最大工作人员设置为您希望在管道中的任何位置支持的最小并发级别。 【参考方案1】:

我不确定工人静默失败的根本原因是什么,但是当您使用AfterCount 触发器时您的窗口没有被绑定的原因是因为触发器仅在流式管道中工作。由于您的管道是批处理管道,因此计数会被忽略。

解决方案是避免同时使用窗口化、触发器和 group-by 来批处理元素,而是将其替换为 BatchElements 转换,这似乎完全符合您的需要。这样,您的管道将如下所示:

Read from datastore
| batch into X elements
| fetch from service in batch and process with output documents
| batch into Y documents
| insert into ES

【讨论】:

从我读过的内容来看,它们应该在批处理管道中工作,但窗口本身似乎不适用于有界源。

以上是关于如何在 Apache Beam/Google 数据流中将大窗口缩减为小窗口?的主要内容,如果未能解决你的问题,请参考以下文章

Apache Beam/Google Dataflow - 将数据从 Google Datastore 导出到 Cloud Storage 中的文件

如何在 Apache Spark 中获取上一行的数据

如何在 Scala 中将数据帧转换为 Apache Spark 中的数据集?

刚刚晋升为 Apache 顶级项目的 Hudi 如何在数据湖上玩转增量处理

如何选择Apache Spark和Apache Flink

如何在 Apache Pig 上强制执行正确的数据类型?