如何使用多个工作人员加快批量导入谷歌云数据存储的速度?

Posted

技术标签:

【中文标题】如何使用多个工作人员加快批量导入谷歌云数据存储的速度?【英文标题】:How to speedup bulk importing into google cloud datastore with multiple workers? 【发布时间】:2018-10-16 18:33:03 【问题描述】:

我有一个基于 apache-beam 的数据流作业,要使用 vcf source 从单个文本文件(存储在谷歌云存储中)读取,将文本行转换为数据存储区 Entities 并将它们写入 datastore sink。工作流程运行良好,但我注意到的缺点是:

数据存储区的写入速度最多约为每秒 25-30 个实体。 我尝试使用--autoscalingAlgorithm=THROUGHPUT_BASED --numWorkers=10 --maxNumWorkers=100,但执行似乎更喜欢一个工作人员(见下图:目标工作人员曾经增加到 2,但“基于在当前运行步骤中并行化工作的能力”减少到 1) .

我没有使用祖先路径作为键;所有实体都是相同的kind

管道代码如下所示:

def write_to_datastore(project, user_options, pipeline_options):
"""Creates a pipeline that writes entities to Cloud Datastore."""
  with beam.Pipeline(options=pipeline_options) as p:
  (p
   | 'Read vcf files' >> vcfio.ReadFromVcf(user_options.input)
   | 'Create my entity' >> beam.ParDo(
     ToEntityFn(), user_options.kind)
   | 'Write to datastore' >> WriteToDatastore(project))

因为我有数百万行要写入数据存储区,以 30 个实体/秒的速度写入会花费很长时间。

问题:输入只是一个巨大的 gzip 文件。我是否需要将其拆分为多个小文件以触发多个工作人员?有没有其他方法可以加快导入速度?我错过了num_workers 设置中的某些内容吗?谢谢!

【问题讨论】:

【参考方案1】:

我对apache beam不熟悉,答案是从一般流程的角度来看的。

假设在各个输入文件部分中的实体数据之间没有要考虑的依赖关系,那么是的,使用多个输入文件肯定会有所帮助,因为所有这些文件都可以虚拟并行处理(当然,取决于最大可用工人的数量)。

可能不需要事先拆分巨大的 zipfile,如果这种切换的开销,可以简单地将单个输入数据流的段移交给单独的数据段工作人员进行写入与实际的数据段处理相比,它本身是微不足道的。

整体性能限制将是读取输入数据、将其拆分为段并移交给段数据工作者的速度。

数据段工作者将进一步将其接收的数据段拆分为更小的块,最多相当于最多 500 个实体,这些实体可以在单个批处理操作中转换为实体并写入数据存储区。根据使用的数据存储客户端库,可能会异步执行此操作,从而允许继续拆分为块并转换为实体,而无需等待先前的数据存储写入完成。

数据段工作器的性能限制将是数据段可以拆分为块并将块转换为实体的速度

如果异步操作不可用或更高的吞吐量,则可以将每个块再次移交给分段工作者,由分段工作者执行转换为实体和数据存储批量写入。

数据段工作人员级别的性能限制将只是数据段可以拆分为块并移交给块工作人员的速度。

使用这种方法,实际转换为实体并将它们批量写入数据存储(异步或非异步)将不再处于拆分输入数据流的关键路径中,我相信这是您当前的性能限制接近。

【讨论】:

确实,实体数据之间没有依赖关系。据我了解,我已经生成了实体的 PCollection(分布式实体数据集),但问题是当写入速度如此缓慢时,自动缩放不起作用。我想这更像是一个 apache 梁问题。但是谢谢你的回答,Dan,+1。【参考方案2】:

我研究了vcfio 的设计。我怀疑(如果我理解正确的话)当输入是单个文件时我总是得到一个工作人员的原因是由于_VcfSource 和VCF format 约束的限制。这种格式有一个标题部分,它定义了如何翻译非标题行。这导致读取源文件的每个工作人员都必须处理整个文件。当我将单个文件拆分为 5 个共享相同标题的单独文件时,我成功地获得了多达 5 个工作人员(但由于相同的原因,可能不再有)。

我不明白的一件事是读取的工人数量可以限制为 5(在这种情况下)。但是为什么我们被限制为只有 5 个工人可以写呢?无论如何,我想我已经找到了使用光束 Dataflow-Runner 触发多个工作人员的替代方法(使用 pre-split VCF 文件)。在gcp variant transforms project 中也有一个相关的方法,其中vcfio 得到了显着扩展。它似乎支持使用单个输入 vcf 文件的多个工作人员。我希望该项目中的更改也可以合并到梁项目中。

【讨论】:

以上是关于如何使用多个工作人员加快批量导入谷歌云数据存储的速度?的主要内容,如果未能解决你的问题,请参考以下文章

如何从谷歌云存储中批量删除文件? (Node.js)

从谷歌云上传多个 csv 到 bigquery

如何从谷歌云存储中的多个文件中删除扩展名?

如何在谷歌云存储中启用实时对象访问分析?

自动化谷歌云 AutoML 管道?

谷歌云构建:在谷歌云存储库上克隆私有存储库失败