如何在另一个任务气流中使用查询结果(bigquery 运算符)

Posted

技术标签:

【中文标题】如何在另一个任务气流中使用查询结果(bigquery 运算符)【英文标题】:How to use the result of a query (bigquery operator) in another task-airflow 【发布时间】:2021-08-19 17:47:54 【问题描述】:

我在 google composer 中有一个项目,旨在每天提交。 下面的代码就是这样做的,效果很好。

with models.DAG('reporte_prueba',
    schedule_interval=datetime.timedelta(weeks=4),
    default_args=default_dag_args) as dag:

    make_bq_dataset = bash_operator.BashOperator(
        task_id='make_bq_dataset',
        # Executing 'bq' command requires Google Cloud SDK which comes
        # preinstalled in Cloud Composer.
        bash_command='bq ls  || bq mk '.format(
            bq_dataset_name, bq_dataset_name))
        
    bq_audit_query = bigquery_operator.BigQueryOperator(
        task_id='bq_audit_query',
        sql=query_sql,
        use_legacy_sql=False,
        destination_dataset_table=bq_destination_table_name)

    export_audits_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
        task_id='export_audits_to_gcs',
        source_project_dataset_table=bq_destination_table_name,
        destination_cloud_storage_uris=[output_file],
        export_format='CSV')
    
    download_file = GCSToLocalFilesystemOperator(
        task_id="download_file",
        object_name='audits.csv',
        bucket='bucket-reportes',
        filename='/home/airflow/gcs/data/audits.csv',
    )
    email_summary = email_operator.EmailOperator(
        task_id='email_summary',
        to=['aa@bb.cl'],
        subject="""Reporte de Auditorías Diarias 
        Institución: institution_report día date_report
        """.format(date_report=date,institution_report=institution),
        html_content="""
        Sres.
        <br>
        Adjunto enviamos archivo con Reporte Transacciones Diarias.
        <br>
        """,
        files=['/home/airflow/gcs/data/audits.csv'])

    delete_bq_table = bash_operator.BashOperator(
        task_id='delete_bq_table',
        bash_command='bq rm -f %s' % bq_destination_table_name,
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)


    (
        make_bq_dataset 
        >> bq_audit_query 
        >> export_audits_to_gcs 
        >> delete_bq_table
    )
    export_audits_to_gcs >> download_file >> email_summary

使用此代码,我创建了一个包含我需要发送的数据的表(稍后将被删除),然后我将该表作为 csv 传递到存储。 然后我将 .csv 下载到本地气流目录以通过邮件发送。

我的问题是,如果我可以避免创建表并将其存储的部分。因为我不需要它。

例如,使用 BigqueryOperator 执行查询,并在 ariflow 中访问结果,从而在本地生成 csv 然后发送。

我有办法生成 CSV,但我最大的疑问是如何(如果可能的话)访问查询结果或将结果传递给另一个气流任务

【问题讨论】:

这个相关帖子有很多答案,但根据您的用例,这个答案更适用,更简单***.com/a/66647164/14733669。 【参考方案1】:

虽然我不建议跨任务传递 sql 查询的结果,但气流中的 XComs 通常用于任务之间的通信。

https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html

您还需要创建一个自定义运算符来返回查询结果,因为我“相信”BigQueryOperator 不会返回查询结果。

【讨论】:

为什么不建议在任务之间传递结果?还是我自己的意见?你能不能设计一个操作员一起完成这两个任务?

以上是关于如何在另一个任务气流中使用查询结果(bigquery 运算符)的主要内容,如果未能解决你的问题,请参考以下文章

apache气流的sql查询

如何使用气流检查长时间运行的 http 任务的状态?

(Django)气流中的 ORM - 有可能吗?

如何在另一个中使用一个光滑查询的结果并在两者之间进行计算

如何强制气流任务失败?

如何将数据帧传递到气流任务的临时表中