当有多个来源时,Google Dataflow 一次不会读取超过 3 个输入压缩文件

Posted

技术标签:

【中文标题】当有多个来源时,Google Dataflow 一次不会读取超过 3 个输入压缩文件【英文标题】:Google Dataflow not reading more than 3 input compressed files at once when there are multiple sources 【发布时间】:2016-12-23 09:26:14 【问题描述】:

背景:我在 30 个单独的压缩文件中存储了 30 天的数据,这些文件存储在谷歌存储中。我必须将它们写入同一张表中 30 个不同分区的 BigQuery 表。每个压缩文件大小约为 750MB。

我今天在 Google Dataflow 上对同一数据集进行了 2 次实验。

实验 1:我使用 TextIO 读取每天的压缩文件,应用简单的 ParDo 转换来准备 TableRow 对象,然后使用 BigQueryIO 将它们直接写入 BigQuery。因此,基本上创建了 30 对并行未连接的源和接收器。但我发现在任何时候,只有 3 个文件被读取、转换并写入 BigQuery。 Google Dataflow 的 ParDo 转换和 BigQuery 写入速度在任何时间点都在 6000-8000 个元素/秒左右。 因此,在任何时候都只处理了 30 个源和接收器中的 3 个,这大大减慢了进程。在 90 多分钟内,只有 30 个文件中的 7 个被写入表的单独 BigQuery 分区。

实验 2:这里我首先从同一个压缩文件中读取 30 天每天的数据,对这 30 个 PCollection 应用 ParDo 转换,并将这 30 个结果 Pcollections 存储在一个 PCollectionList 对象中。并行读取所有这 30 个 TextIO 源。 现在我直接使用 BigQueryIO 将 PCollectionList 中每天的数据对应的每个 PCollection 写入 BigQuery。因此,30 个接收器被再次并行写入。 我发现在 30 个并行源中,只有 3 个源被读取并以大约 20000 个元素/秒的速度应用 ParDo 转换。在写这个问题的时候已经过去了 1 小时,从所有压缩文件中读取甚至还没有完全读取 50% 的文件,甚至还没有开始写入 BigQuery 表分区。

这些问题似乎只有在 Google Dataflow 读取压缩文件时才会出现。我曾问过一个关于它从压缩文件中读取速度慢的问题(Relatively poor performance when reading compressed files vis a vis normal text files kept in google storage using google dataflow),并被告知并行化工作会使读取速度更快,因为只有 1 个工作人员读取压缩文件,而多个来源意味着多个工作人员有机会读取多个文件。但这似乎也不起作用。

有什么方法可以加快从多个压缩文件中读取并同时写入数据流作业中 BigQuery 中同一表的单独分区的整个过程?

【问题讨论】:

1) 您使用的是 Java SDK 吗?如果是这样..2) 您使用的是什么版本的 Java SDK? 3) 你在 TextIO.Read 绑定 (AUTO, GZIP, ..) 中将 compressionType 设置为什么? 我正在使用 Google Cloud Dataflow Java SDK 1.6.0。我在阅读时没有设置任何压缩类型。所以默认压缩类型应该设置为“AUTO”。代码运行的文件扩展名为.gz 您能提供工作 ID 吗?你使用了多少个工人(我相信默认是 3 个)? 实验 1 的作业 ID 为 2016-08-16_12_21_50-6508500558826000885。实验 2 的作业 ID 为 2016-08-16_12_59_18-12710516602377435100 这两项工作只需要三个工人。您可以设置 maxNumWorkers 选项来调整要使用的最大工作人员数量,并设置 numWorkers 来设置初始数量。在这两个管道中,您似乎设置了一个名为 numberOfWorkers 的选项,而不是设置服务理解的选项 【参考方案1】:

每个压缩文件将由单个工作人员读取。可以使用 numWorkers 管道选项增加作业的初始工作人员数量,可以使用 maxNumWorkers 管道选项设置可以扩展到的最大数量。

【讨论】:

对于非压缩文件是否也是如此,即只有一个工作人员读取一个文件?

以上是关于当有多个来源时,Google Dataflow 一次不会读取超过 3 个输入压缩文件的主要内容,如果未能解决你的问题,请参考以下文章

Google Dataflow - 由GoogleSheets支持的BigQuery工作

我们可以使用单个 Google Cloud Dataflow 将来自多个 Pubsub(源)的数据写入多个 GCS(接收器)吗?

在 google-cloud-dataflow 中使用文件模式匹配时如何获取文件名

当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub

尝试在 Dataflow 中使用 Apache Beam 将数据从 Google PubSub 写入 GCS 时出错

确定导致 Google Dataflow 作业失败的特定输入数据