Apache Airflow - BigQueryOperator:如何动态设置destination_dataset_table 分区

Posted

技术标签:

【中文标题】Apache Airflow - BigQueryOperator:如何动态设置destination_dataset_table 分区【英文标题】:Apache Airflow - BigQueryOperator: How to dynamically set destination_dataset_table partition 【发布时间】:2017-10-23 14:20:50 【问题描述】:

我需要一个 BigQueryOperator 任务,如下所示:我需要将查询结果保存到分区表中。但是,"month_start" 需要从实际的 DAG execution_date 派生。我在我的 DAG 定义脚本(在 Python 中)中找不到有关如何读取 execution_date 的任何文档或示例。期待在这里得到一些帮助。

FYR:我使用 Airflow 1.8.2

 t1_invalid_geohash_by_traffic =  BigQueryOperator(
                                        task_id='invalid_geohash_by_traffic',
                                        bql='SQL/dangerous-area/InvalidGeohashByTraffic.sql',
                                        params = params,
                                        destination_dataset_table=
                                        'mydataset.mytable$'.format(month_start),                                      write_disposition='WRITE_TRUNCATE',
                                        bigquery_conn_id=CONNECTION_ID,
                                        use_legacy_sql=False
                                    )

【问题讨论】:

【参考方案1】:

我想我找到了答案。刚跑进这个博客:https://cloud.google.com/blog/big-data/2017/07/how-to-aggregate-data-for-bigquery-using-apache-airflow

【讨论】:

以上是关于Apache Airflow - BigQueryOperator:如何动态设置destination_dataset_table 分区的主要内容,如果未能解决你的问题,请参考以下文章

如何使用apache气流调度谷歌云bigquery存储过程

Airflow 中的 BigQuery 参数化查询

Airflow - BigQuery 作业状态检查失败。最终错误是:%s'

Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复运行

检查 Airflow 中是不是存在 Bigquery 分区

将 Airflow(版本 1.10.5)与 Bigquery 连接