优化将BigQuery的数据传输到MongoDB的气流任务

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了优化将BigQuery的数据传输到MongoDB的气流任务相关的知识,希望对你有一定的参考价值。

我需要改善将数据从BigQuery传输到MongoDB的Airflow任务的性能。我的DAG中的相关任务使用PythonOperator,并简单地调用以下python函数来传输单个表/集合:

def transfer_full_table(table_name):
    start_time = time.time()

    # (1) Connect to BigQuery + Mongo DB
    bq = bigquery.Client()
    cluster = MongoClient(MONGO_URI)
    db = cluster["dbname"]
    print(f'(1) Connected to BQ + Mongo: {round(time.time() - start_time, 5)}')

    # (2)-(3) Run the BQ Queries
    full_query = f"select * from `gcpprojectid.models.{table_name}`"
    results1 = bq.query(full_query)
    print(f'(2) Queried BigQuery: {round(time.time() - start_time, 5)}')
    results = results1.to_dataframe()
    print(f'(3) Converted to Pandas DF: {round(time.time() - start_time, 5)}')

    # (4) Handle Missing DateTimes   # Can we refactor this into its own function?
    datetime_cols = [key for key in dict(results.dtypes) if is_datetime(results[key])]
    for col in datetime_cols:
        results[[col]] = results[[col]].astype(object).where(results[[col]].notnull(), None)
    print(f'(4) Resolved Datetime Issue: {round(time.time() - start_time, 5)}')

    # (5) And Insert Properly Into Mongo
    db[table_name].drop()
    db[table_name].insert_many(results.to_dict('records'))
    print(f'(5) Wrote to Mongo: {round(time.time() - start_time, 5)}')

[DAG设置为将许多表从BigQuery传输到MongoDB(每个任务一次传输),并且此特定的transfer_full_table函数用于传输整个单一表,因此很简单:

  • 查询整个BQ表
  • 转换为熊猫,修复类型问题
  • 删除先前的MongoDB集合并重新插入

我正在尝试在大小为60MB的表上使用此功能,这是任务各个部分的性能:

(1) Connected to BQ + Mongo: 0.0786
(2) Queried BigQuery: 0.80595
(3) Converted to Pandas DF: 87.2797
(4) Resolved Datetime Issue: 88.33461
(5) Wrote to Mongo: 213.92398

步骤3和5一直耗费时间。任务可以非常快速地连接到BQ和Mongo(1),BQ可以非常快速地查询60MB(2)的表。但是,当我转换为熊猫数据框(3)((4)需要处理我遇到的type问题)时,此步骤大约需要86.5秒。这样,解决日期时间问题的速度就非常快(4),但是最后,将先前的MongoDB集合删除并将新的熊猫数据帧重新插入MongoDB(5),则需要(213.9-88.3)=〜125秒。] >

在熊猫或MongoDB端,关于如何优化这两个瓶颈的任何提示,将不胜感激!

我需要改善将数据从BigQuery传输到MongoDB的Airflow任务的性能。我的DAG中的相关任务使用PythonOperator,只需将以下python函数调用为...

答案

简短的答案是异步操作使您的配置变得混乱。

以上是关于优化将BigQuery的数据传输到MongoDB的气流任务的主要内容,如果未能解决你的问题,请参考以下文章

使用用户定义的函数在 BigQuery 数据集中插入海量数据时如何优化性能

是否可以创建每天将SQL数据库写入MongoDB的管道?

MongoDB的工作原理,工作方式和最有意义的优化方案

dbt - jinja - bigquery - 查询优化

可以创建每天将 SQL 数据库写入 MongoDB 的管道吗?

从 pyspark 中删除 bigquery 表