Apache Beam/数据流重组

Posted

技术标签:

【中文标题】Apache Beam/数据流重组【英文标题】:Apache Beam/Dataflow Reshuffle 【发布时间】:2019-06-04 21:46:22 【问题描述】:

org.apache.beam.sdk.transforms.Reshuffle 的目的是什么?在文档中,目的被定义为:

返回与其输入等效的 PCollection 的 PTransform,但 在操作上提供了 GroupByKey 的一些副作用,在 特别是防止融合周围的变换, 通过 id 进行检查点和重复数据删除。

防止周围变换融合有什么好处?我认为融合是一种优化,可以防止不必要的步骤。实际用例会有所帮助。

【问题讨论】:

【参考方案1】:

在几种情况下,您可能需要重新排列数据。以下不是一份详尽的清单,但应该可以让您了解为什么要重新洗牌:

当您的 ParDo 变换之一具有非常高的扇出时

这意味着在 ParDo 之后并行度会增加。如果您不在这里中断融合,您的管道将无法将数据拆分到多台机器上进行处理。

考虑为每个输入元素生成一百万个输出元素的 DoFn 的极端情况。考虑这个 ParDo 在其输入中接收 10 个元素。如果你不打破这个高扇出 ParDo 与其下游转换之间的融合,它将只能在 10 台机器上运行,尽管你将拥有数百万个元素。

诊断此问题的好方法是查看输入 PCollection 中的元素数量与输出 PCollection 的元素数量。如果后者明显大于第一个,那么您可能需要考虑添加重新洗牌。

当您的数据在机器之间没有得到很好的平衡时**

假设您的管道使用 9 个 10MB 的文件和一个 10GB 的文件。如果每个文件都由一台机器读取,那么您将拥有一台机器的数据比其他机器多得多。

如果您不重新洗牌这些数据,您的大部分机器将在您的管道运行时处于空闲状态。重新洗牌可以让您重新平衡数据,以便在机器之间更均匀地处理。

诊断此问题的一个好方法是查看有多少工作人员正在您的管道中执行工作。如果管道很慢,并且只有一个工作人员处理数据,那么您可以从重新洗牌中受益。

【讨论】:

我明白了,我是否可以通过堆栈驱动程序图或日志观察到一些迹象表明需要重新洗牌?如果我能从 Dataflow 工作人员那里得到一些信号,那就是重新洗牌会提高性能而不是猜测的情况,那就太好了。 我已经编辑了问题来回答这些问题。您可以先查看 Dataflow UI 中输入与输出 PCollections 的大小 Dataflow 是否会根据奋斗者将工作分配给多个工作人员?例如,使用自动烫伤它总是会尝试优化资源,因此尝试使用更少的工人来运行。在您的示例的情况 2 中,我们是否需要通过重新洗牌明确声明我们需要更多的工人? 没错。当数据源允许时,可以拆分工作——但这​​并不总是可行的。例如,对于单个压缩块,不可能并行化它们的进度——当你有一个热键时也是如此。 @Pablo Read 转换怎么样?假设您正在阅读带有 3 个 avro 文件的“gs://dir/*.avro”,每个文件包含 100+ 百万行。您是否在读取转换后重新洗牌以能够使用 512 名工人?我还看到读取是拆分+读取,我们不应该在拆分之后和读取之前重新洗牌吗?如果是这种情况,我们如何在客户端代码中只有一个牢不可破的读取转换时做到这一点?

以上是关于Apache Beam/数据流重组的主要内容,如果未能解决你的问题,请参考以下文章

使用 Apache Beam 从数据库中读取批量数据

Apache Beam 数据流 BigQuery

apache beam入门之 窗口水位线和超时数据概念

从 Apache Beam(GCP 数据流)写入 ConfluentCloud

Apache Beam 处理文件

apache beam入门之组装数据变换过程