Dataflow Bigquery-Bigquery 管道在较小的数据上执行,但不是在大型生产数据集上执行

Posted

技术标签:

【中文标题】Dataflow Bigquery-Bigquery 管道在较小的数据上执行,但不是在大型生产数据集上执行【英文标题】:Dataflow Bigquery-Bigquery pipeline executes on smaller data, but not the large production dataset 【发布时间】:2021-07-16 17:02:16 【问题描述】:

这里是 Dataflow 的新手,但已成功创建了一个运行良好的管道。

管道从 BigQuery 读取查询,应用 ParDo(NLP 功能),然后将数据写入新的 BigQuery 表。

我正在尝试处理的数据集大约为 500GB,包含 46M 条记录。

当我尝试使用相同数据的子集(大约 30 万条记录)时,它工作得很好并且速度很快,见下文:

当我尝试使用完整的数据集运行它时,它开始非常快,但随后逐渐减少并最终失败。此时作业失败并添加了大约 900k 个元素,大约 6-7GB,然后元素数量实际上开始减少。

我正在使用 250 个工人和一个 n1-highmem-6 机器类型

在工作日志中,我得到了其中的一些(大约 10 个):

Info
2021-04-22 06:29:38.236 EDTRefreshing due to a 401 (attempt 1/2)

这是最后的警告之一:

2021-04-22 06:29:32.392 EDTS08:[85]: GetArticles/Read+[85]: GetArticles/_PassThroughThenCleanup/ParDo(PassThrough)/ParDo(PassThrough)+[85]: ExtractEntity+[85]: WriteToBigQuery/BigQueryBatchFileLoads/RewindowIntoGlobal+[85]: WriteToBigQuery/BigQueryBatchFileLoads/AppendDestination+[85]: WriteToBigQuery/BigQueryBatchFileLoads/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)/ParDo(WriteRecordsToFile)+[85]: WriteToBigQuery/BigQueryBatchFileLoads/IdentityWorkaround+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupFilesByTableDestinations/Reify+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupFilesByTableDestinations/Write+[85]: WriteToBigQuery/BigQueryBatchFileLoads/ParDo(_ShardDestinations)+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupShardedRows/Reify+[85]: WriteToBigQuery/BigQueryBatchFileLoads/GroupShardedRows/Write failed.

在执行细节中有多个:

2021-04-22 06:29:40.202 EDTOperation ongoing for over 413.09 seconds in state process-msecs in step s6 . Current Traceback: File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 144, in <module> main() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 140, in main batchworker.BatchWorker(properties, sdk_pipeline_options).run() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 844, in run deferred_exception_details=deferred_exception_details) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "<ipython-input-81-df441d984b0a>", line 194, in process File "<ipython-input-81-df441d984b0a>", line 173, in extract_entities File "<ipython-input-81-df441d984b0a>", line 95, in get_company_sentences

我假设这些来自数据集中较大的文本,可能需要一段时间来处理,所以稍等片刻之后,这些项目被处理,下一个项目开始。

还有一些这样的:

2021-04-22 06:29:40.202 EDTOperation ongoing for over 413.09 seconds in state process-msecs in step s6 . Current Traceback: File "/usr/local/lib/python3.7/runpy.py", line 193, in _run_module_as_main "__main__", mod_spec) File "/usr/local/lib/python3.7/runpy.py", line 85, in _run_code exec(code, run_globals) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 144, in <module> main() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/start.py", line 140, in main batchworker.BatchWorker(properties, sdk_pipeline_options).run() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 844, in run deferred_exception_details=deferred_exception_details) File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 649, in do_work work_executor.execute() File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute op.start() File "<ipython-input-81-df441d984b0a>", line 194, in process File "<ipython-input-81-df441d984b0a>", line 173, in extract_entities File "<ipython-input-81-df441d984b0a>", line 95, in get_company_sentences

所有这一切,对我来说有点令人困惑,而且并不完全直观——尽管它工作时的服务很棒。

我正在从 Jupyter 笔记本执行作业(不使用交互式运行器,只是执行脚本)。

主管道如下:

p = beam.Pipeline()

#Create a collection from Bigquery
articles = p | "GetArticles" >> beam.io.ReadFromBigQuery(query='SELECT id,uuid, company_id_id, title, full_text, FROM `MY TABLE` ', gcs_location=dataflow_gcs_location, project='my_project',use_standard_sql=True)

#Extract entities with NLP
entities = articles | "ExtractEntity" >> beam.ParDo(EntityExtraction()) 

#Write to bigquery 
entities | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('myproject:dataset.table', schema = schema,custom_gcs_temp_location=dataflow_gcs_location, create_disposition="CREATE_IF_NEEDED",write_disposition="WRITE_APPEND")  ```

我做错了什么?这是内存问题吗?我不应该像这样读写 BigQuery 而是输出到文件并从中创建表吗?希望得到一些帮助,很抱歉这篇文章很长,希望提供尽可能多的背景信息。

【问题讨论】:

【参考方案1】:

聚会可能迟到了,但我对包含 770 万行的 BigQuery 表进行了一些测试,其中要处理的字符串大约为。 350 字。

我运行的管道和你一样:

    从 BigQuery 读取数据 使用 python string library 清理字符串 使用 Spacy fr_core_news_lg 模型,获取字符串的词形化部分 将数据写回 BigQuery(在不同的表中)

一开始我遇到了和你一样的问题,元素/秒的数量随着时间的推移而下降。

我意识到这是 RAM 的问题。我将 machine_typecustom-1-3072 更改为 custom-1-15360-ext 并从与您相同的个人资料转到此个人资料:

我认为 Dataflow 可以使用 NLP 模型处理大量行,但您必须确保为工作人员提供足够的 RAM。

此外,使用number_of_worker_harness_threads=1 确保 Dataflow 不会产生多个线程(从而将 ram 拆分为线程)也很重要。

你也可以看看这个stack thread,最初的问题是一样的。

最后,我的工作人员的 CPU 利用率来自:

收件人:

这也是缺少 RAM 的标志。

编辑:我使用与您相同的数据量规模运行我的管道,以确保我的测试没有偏差,结果是相同的:RAM 量似乎是使工作顺利运行的关键:

【讨论】:

这太棒了,我要试试这个。您的自定义机器中有多少 RAM? 很高兴能在这么长时间后提供帮助!我的定制机器运行 15Gb RAM (15360Mb) 但也许你需要的更少,我将运行一些测试来找到削减成本的限制。让我知道它是否对你有用! 对了,你能发一张工人的截图吗?您自动缩放到多少? 我不能真正发布工人的屏幕截图,因为总共有 70 个计算引擎(我们受到使用中的地址的限制),但我可以添加代码 sn-p 定义如果需要,可以使用管道选项。【参考方案2】:

我发现 Dataflow 对于像这样的大型 NLP 批处理作业不是很好。我解决这个问题的方法是将较大的作业分成可以可靠运行的较小的作业。因此,如果您可以可靠地运行 100K 文档,只需运行 500 个作业。

【讨论】:

是的,我在实验后支持这个。正如您所说,由于我使用的是 spacy,因此较小的模型可能适用于较大的批次,但对于我的用例来说,它可以在较小的部分中运行!

以上是关于Dataflow Bigquery-Bigquery 管道在较小的数据上执行,但不是在大型生产数据集上执行的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Stream 为 Spring Cloud Dataflow 中的子任务设置全局属性 - Task-Launcher-Dataflow

DataFlow 上传产品图片

为啥使用 Dataflow 写入 Bigquery 非常慢?

Dataflow编程模型和spark streaming结合

hpx::dataflow 和成员函数的编译错误

延迟发布到 DataFlow