优化将BigQuery的数据传输到MongoDB的气流任务
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了优化将BigQuery的数据传输到MongoDB的气流任务相关的知识,希望对你有一定的参考价值。
我需要改善将数据从BigQuery传输到MongoDB的Airflow任务的性能。我的DAG
中的相关任务使用PythonOperator
,并简单地调用以下python函数来传输单个表/集合:
def transfer_full_table(table_name):
start_time = time.time()
# (1) Connect to BigQuery + Mongo DB
bq = bigquery.Client()
cluster = MongoClient(MONGO_URI)
db = cluster["dbname"]
print(f'(1) Connected to BQ + Mongo: {round(time.time() - start_time, 5)}')
# (2)-(3) Run the BQ Queries
full_query = f"select * from `gcpprojectid.models.{table_name}`"
results1 = bq.query(full_query)
print(f'(2) Queried BigQuery: {round(time.time() - start_time, 5)}')
results = results1.to_dataframe()
print(f'(3) Converted to Pandas DF: {round(time.time() - start_time, 5)}')
# (4) Handle Missing DateTimes # Can we refactor this into its own function?
datetime_cols = [key for key in dict(results.dtypes) if is_datetime(results[key])]
for col in datetime_cols:
results[[col]] = results[[col]].astype(object).where(results[[col]].notnull(), None)
print(f'(4) Resolved Datetime Issue: {round(time.time() - start_time, 5)}')
# (5) And Insert Properly Into Mongo
db[table_name].drop()
db[table_name].insert_many(results.to_dict('records'))
print(f'(5) Wrote to Mongo: {round(time.time() - start_time, 5)}')
[DAG设置为将许多表从BigQuery传输到MongoDB(每个任务一次传输),并且此特定的transfer_full_table
函数用于传输整个单一表,因此很简单:
- 查询整个BQ表
- 转换为熊猫,修复类型问题
- 删除先前的MongoDB集合并重新插入
我正在尝试在大小为60MB的表上使用此功能,这是任务各个部分的性能:
(1) Connected to BQ + Mongo: 0.0786
(2) Queried BigQuery: 0.80595
(3) Converted to Pandas DF: 87.2797
(4) Resolved Datetime Issue: 88.33461
(5) Wrote to Mongo: 213.92398
步骤3和5一直耗费时间。任务可以非常快速地连接到BQ和Mongo(1),BQ可以非常快速地查询60MB(2)的表。但是,当我转换为熊猫数据框(3)((4)需要处理我遇到的type
问题)时,此步骤大约需要86.5秒。这样,解决日期时间问题的速度就非常快(4),但是最后,将先前的MongoDB集合删除并将新的熊猫数据帧重新插入MongoDB(5),则需要(213.9-88.3)=〜125秒。] >
在熊猫或MongoDB端,关于如何优化这两个瓶颈的任何提示,将不胜感激!
我需要改善将数据从BigQuery传输到MongoDB的Airflow任务的性能。我的DAG中的相关任务使用PythonOperator,只需将以下python函数调用为...
简短的答案是异步操作使您的配置变得混乱。
以上是关于优化将BigQuery的数据传输到MongoDB的气流任务的主要内容,如果未能解决你的问题,请参考以下文章
使用用户定义的函数在 BigQuery 数据集中插入海量数据时如何优化性能