使用多进程池通过 Python 将_table_from_dataframe 加载到 BigQuery

Posted

技术标签:

【中文标题】使用多进程池通过 Python 将_table_from_dataframe 加载到 BigQuery【英文标题】:Using multiprocess Pool to load_table_from_dataframe into BigQuery with Python 【发布时间】:2021-10-30 12:10:31 【问题描述】:

我有一个脚本,它处理一些信息,将其转换为 Pandas DataFrame,然后将其上传到 BigQuery。到目前为止它运行良好,但我想加快速度。

我正在使用多进程来执行此操作。到目前为止,一切都很好,可以将数据帧从 Pandas 加载到 BigQuery:在此过程中的某个时间,我会收到“禁止:403 超出速率限制:此表的表更新操作过多。”

即使同时执行 10 个任务,我也会收到此错误。

我尝试将 BigQuery 作业配置为 Batch,但这似乎仅适用于查询操作,不适用于加载操作。

代码有点简单:一旦我得到数据框,它就可以了

job = client.load_table_from_dataframe(df, table_id)
result = job.result()

它调用了这个函数

p = Pool(10)
p.starmap(myfunction, list_of_lists)
p.terminate()
p.join()

有什么想法可以完成这项工作吗?

【问题讨论】:

【参考方案1】:

我已经设法通过 Medium 上的 this post 找到解决方案,因此我将粘贴代码以供将来参考。

根据 Ronnie Joshua 的代码,我所做的是使用 pandas_gbq 写入 BigQuery,为此您必须每次都发送您的凭据。

gcp_bq_config = 
    "gcp_credentials": service_account.Credentials.from_service_account_file(
        gcp_key_path,
    ),
    "bq_project_id": "sm-data-infra",
    "bq_dataset_id": "my_sql_db_crm",



p = Pool(os.cpu_count())
p.starmap(function, 
          tuple(zip(list_to_iterate, repeat(gcp_bq_config))))
p.terminate()
p.join()

它有效,但最终我会收到 403 Exceeded rate limits 错误,所以最后我不得不使用另一种策略。

【讨论】:

以上是关于使用多进程池通过 Python 将_table_from_dataframe 加载到 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章

python:多进程,多进程队列,多进程管道,Manager,进程锁,进程池

python 归纳 (十五)_多进程使用Pool

pythonの多进程

python 进程池pool简单使用

铁乐学python_Day40_进程池

使用进程池规避Python的GIL限制