将大量数据从 DataFlow 加载到外部 postgres 数据库的最佳方法是啥?

Posted

技术标签:

【中文标题】将大量数据从 DataFlow 加载到外部 postgres 数据库的最佳方法是啥?【英文标题】:What is the best way to load a lot of data from DataFlow to external postgres database?将大量数据从 DataFlow 加载到外部 postgres 数据库的最佳方法是什么? 【发布时间】:2021-09-24 20:23:00 【问题描述】:

我正在使用 Python 运行一个管道,我在其中抓取大量数据并希望加载到外部数据库(Digital Ocean Postgres)中,我有一些选项,但它们的性能都不是很好(或者我不能正确评估性能):

使用from beam_nuggets.io import relational_db 对性能有一些不利影响,因为它会逐行加载; 将分组数据上传到pubsub,在那里我可以运行一些云函数来上传到外部数据库,但是即使摸索这也可能是云函数运行的大数据,并且在消息数量方面存在一些限制以及我可以发布的消息的大小; 上传到bucket,我可以在其中运行云函数,但这样我会面临同样的问题,即一个函数需要处理大量数据,而拆分将是大量函数调用;

对我来说,如果我可以在一个插入中上传一堆数据,比如COPY TO,那会很好(我认为),但我无法在 DataFlow 上运行它。

有什么想法吗?

【问题讨论】:

【参考方案1】:

在您提交 Dataflow 流水线后,Dataflow 服务会启动多个工作器(即 VM)来执行您的流水线。每个工作人员将被分配特定的键范围以进行并行执行,并在必要时将数据随机分配给另一个工作人员(即 GroupByKey)。因此,一般来说,如果您有较大的键数和较小的键值大小,您的管道会具有更好的性能。

在您的“用于运行功能的大数据”的情况下,如果您的数据与许多键相关联,并且为这些键运行相同的功能,那可能没问题。但是,如果您指的是在函数中运行的一个键的大数据(即 DoFn),那么它可能会遇到性能问题,因为它可能会减慢键的并行性,并且您可能会遇到一些系统限制 (https://cloud.google.com/dataflow/quotas)。例如,单个元素有大小限制。

只是后一种情况的一些想法:您可以考虑将(单个大)数据拆分为多个部分并分配给不同的键(例如,使用公共前缀)。在数据库查询层,您可以根据前缀查询它们。或者如果您的数据库支持键值附加,您可以在一个键中批量读取您的(单个大)数据并将结果附加到数据库中。

我认为一个很好的起点是检查 google 提供的模板 (https://cloud.google.com/dataflow/docs/guides/templates/provided-templates),您可以在其中找到从 GCS 读取数据并将处理后的数据插入 BigQuery 的示例。你可以做类似的事情。

【讨论】:

对不起,我以通用方式使用函数,我的意思是云函数,在管道内部我使用 DoFn,我将使用它进行插入,但它会进行很多插入(即使是并行) ,在我看来,最好是全部插入或按块插入。关于 BigQuery,它更容易,因为它有一些代码来执行批量加载,但我想加载到外部数据库。

以上是关于将大量数据从 DataFlow 加载到外部 postgres 数据库的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Dataflow 管道 (python) 将多个 Json zip 文件从 GCS 加载到 BigQuery

将大量数据从 Netezza 复制到 Teradata

从 DataFlow 加载到现有 BigQuery 表时是不是可以更新架构?

用于 NRT 数据应用的 Google Cloud DataFlow

使用 write_truncate 通过 Google Dataflow/Beam 将数据加载到 Biqquery 分区表中

Dataflow 中的 BigQuery 无法从 Cloud Storage 加载数据:为非记录字段指定了 JSON 对象