使用多进程池通过 Python 将_table_from_dataframe 加载到 BigQuery
Posted
技术标签:
【中文标题】使用多进程池通过 Python 将_table_from_dataframe 加载到 BigQuery【英文标题】:Using multiprocess Pool to load_table_from_dataframe into BigQuery with Python 【发布时间】:2021-10-30 12:10:31 【问题描述】:我有一个脚本,它处理一些信息,将其转换为 Pandas DataFrame,然后将其上传到 BigQuery。到目前为止它运行良好,但我想加快速度。
我正在使用多进程来执行此操作。到目前为止,一切都很好,可以将数据帧从 Pandas 加载到 BigQuery:在此过程中的某个时间,我会收到“禁止:403 超出速率限制:此表的表更新操作过多。”
即使同时执行 10 个任务,我也会收到此错误。
我尝试将 BigQuery 作业配置为 Batch,但这似乎仅适用于查询操作,不适用于加载操作。
代码有点简单:一旦我得到数据框,它就可以了
job = client.load_table_from_dataframe(df, table_id)
result = job.result()
它调用了这个函数
p = Pool(10)
p.starmap(myfunction, list_of_lists)
p.terminate()
p.join()
有什么想法可以完成这项工作吗?
【问题讨论】:
【参考方案1】:我已经设法通过 Medium 上的 this post 找到解决方案,因此我将粘贴代码以供将来参考。
根据 Ronnie Joshua 的代码,我所做的是使用 pandas_gbq 写入 BigQuery,为此您必须每次都发送您的凭据。
gcp_bq_config =
"gcp_credentials": service_account.Credentials.from_service_account_file(
gcp_key_path,
),
"bq_project_id": "sm-data-infra",
"bq_dataset_id": "my_sql_db_crm",
p = Pool(os.cpu_count())
p.starmap(function,
tuple(zip(list_to_iterate, repeat(gcp_bq_config))))
p.terminate()
p.join()
它有效,但最终我会收到 403 Exceeded rate limits 错误,所以最后我不得不使用另一种策略。
【讨论】:
以上是关于使用多进程池通过 Python 将_table_from_dataframe 加载到 BigQuery的主要内容,如果未能解决你的问题,请参考以下文章