如何在 Airflow 中修改 BigQuery 外部表的源文件路径?
Posted
技术标签:
【中文标题】如何在 Airflow 中修改 BigQuery 外部表的源文件路径?【英文标题】:How to modify the source file path of a BigQuery external table in Airflow? 【发布时间】:2021-12-03 20:52:14 【问题描述】:有一个过程将一些CSV文件导出到GCS并将当前日期时间放入路径中,例如:
gs://my_gcs_bucket/path/to/export/<current_timestamp>/exported_file_1.csv
gs://my_gcs_bucket/path/to/export/<current_timestamp>/exported_file_2.csv
我想在转换这些文件的列并与其他表合并之前为这些文件创建一个外部表。
问题是我无法在 Airflow 中实现 DAG 来处理不断变化的时间戳。
我可以通过指定路径(使用当前时间戳)来创建一个外部表,但是对于明天的导出,这个外部表将不会读取新文件。但我明天需要同样的project_name.dataset.tablename
进行处理。
from airflow.contrib.operators.bigquery_operator import BigQueryCreateExternalTableOperator
CreateExternalBqTable = BigQueryCreateExternalTableOperator(
task_id = "CreateExternalBqTable",
field_delimiter = '|',
destination_project_dataset_table = f'project_name.dataset.tablename',
bucket = BUCKET_URI,
source_objects = [ 'path/to/export/2021-12-12-12-12-12/exported_file_*' ],
schema_fields = generate_custom_struct()
)
如果明天我将尝试再次运行相同的表创建任务,它将失败,因为外部表已经存在。 我可以删除现有的外部表,然后重新创建它以确保它不会使外部表创建任务失败,但如果它已经被删除,那么这将失败:
from airflow.providers.google.cloud.operators.bigquery import BigQueryDeleteTableOperator
DeleteExternalBqTable = BigQueryDeleteTableOperator(
task_id = "DeleteExternalBqTable",
deletion_dataset_table = f'project_name.dataset.tablename',
)
这里有什么好的模式? 我是否应该仅仅因为新的时间戳而总是删除并重新创建外部表? 在我看来,这种模式非常糟糕且容易出错。
或者我可以更改现有外部表下的路径吗?
如果我可以将外部表的初始化和删除阶段与日常运行分开,而不总是删除或创建它们,我会更开心。我计划只对这些表进行一次初始化,如果我完成了处理,则移走 CSV 文件,并在下次运行之前保持外部表为空。
(我需要适用于 Airflow 1.x 的有效解决方案)
【问题讨论】:
【参考方案1】:在BigQueryDeleteTableOperator
中,您可以使用ignore_if_missing
参数,如Airflow docs 中所述。
但是如果您不想每次都重新创建表,请考虑使用幂等运算符BigQueryCreateEmptyTableOperator
创建表(如果表已经存在,它将跳过创建)然后GoogleCloudStorageToBigQueryOperator
加载数据。这样你就将创建和加载分开了
【讨论】:
以上是关于如何在 Airflow 中修改 BigQuery 外部表的源文件路径?的主要内容,如果未能解决你的问题,请参考以下文章
如何设置 Airflow DAG 权限以查询基于 Google Sheets 文档构建的 BigQuery 表?
使用 Airflow 将 Bigquery 查询结果发送到数据框
如何通过 BigQuery 连接将 use_legacy_sql=False 传递给 Airflow DAG 中的 SqlSensor?