BigQueryOperator 可以运行多个 SQL 文件吗?

Posted

技术标签:

【中文标题】BigQueryOperator 可以运行多个 SQL 文件吗?【英文标题】:Can BigQueryOperator run multiple SQL files? 【发布时间】:2021-11-04 19:16:58 【问题描述】:

我有一些 SQL 由气流 BigQueryOperator 触发,其中一个 SQL 对以下情况给出的所有任务都很常见:

要运行的 SQL - common.sqlabc.sqlxyz.sql

任务1 - common.sql + abc.sql 任务2 - common.sql + xyz.sql

为了一个任务运行 2 个 SQL,我将 2 个 SQL 文件读入一个字符串,然后运行合并的字符串以一次性运行任务。

代码如下:

with open ('common.sql', "r") as sqlfile:
     common_array = sqlfile.readlines()

with open ('abc.sql', "r") as sqlfile:
     abc_array    = sqlfile.readlines()
            
# at this point, sql_script has all codes from common.sql and abc.sql
sql_script = ''.join(common_array) + '\n' + ''.join(abc_array)

BigQueryOperator(task_id='task1', sql=sql_script)

这符合我的目的,还有其他更优雅的方式吗?

【问题讨论】:

嗨,OP。让我们知道答案是否有帮助。如果它回答了您的问题,请考虑接受它。这样其他人就知道你已经(充分地)得到了帮助。另见What should I do when someone answers my question? 【参考方案1】:

您可以创建一个包含文件的列表,而不是手动逐个文件读取。如果您有多个文件,这将很有帮助。对于每个文件读取的内容,将其连接到common_array,并为每个连接的公共和 sql 文件创建一个键值对。

用于测试 test1.sql、test2.sql、test3.sql 和 common.sql 包含单行查询。

import json
with open ('common.sql', "r") as sqlfile:
     common_array = sqlfile.readlines()

file_arr = ['test1.sql','test2.sql','test3.sql']
sql_dict = 
for data in file_arr:
    with open (data, "r") as sqlfile:
        key = data.split('.')[0]
        value = ''.join(common_array) + '\n' + ''.join(sqlfile.read().rstrip())
        sql_dict[key] = value

print(json.dumps(sql_dict, sort_keys=False, indent=2))

print("\nDictionary value for sql_dict['test1']: \n"+ sql_dict['test1'])

输出:


  "test1": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable` LIMIT 10",
  "test2": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable_2` LIMIT 10",
  "test3": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable_3` LIMIT 10"


Dictionary value for sql_dict['test1']:
SELECT * FROM `my-project.my_dataset.common` LIMIT 10

SELECT * FROM `my-project.my_dataset.myTable` LIMIT 10

然后您可以将字典中的值用于BigQueryOperator

BigQueryOperator(task_id='task1', sql=sql_dict['test1'])

【讨论】:

Ricco,感谢您回答问题。您的解决方案类似,也将脚本内容写入字符串,然后分配给 BigQueryOperator 参数。我期待分配一个文件路径之类的东西,但我们可以分配多个文件,而不是只分配一个文件。例如: BigQueryOperator(task_id='task1', sql=['common.sql', 'abc.sql']) (我试试这个,但它不起作用)。如果我可以分配文件路径,代码看起来会更简洁。 @BrianMo 是的,它非常相似,我的解决方案旨在删除多行加入 common 和 sql 文件。我检查了BigQueryOperator 的源代码,到目前为止它只能接受字符串,因此还不能传递 sql 文件列表。因此,您的解决方法是您目前唯一的选择。

以上是关于BigQueryOperator 可以运行多个 SQL 文件吗?的主要内容,如果未能解决你的问题,请参考以下文章

可以使用气流 BigQueryOperator 制作 DDL bigquery 语句吗?

Airflow 的 bigqueryoperator 不能与 udf 一起使用

bigqueryoperator 气流上的 Bigquery 脚本

Apache Airflow - BigQueryOperator:如何动态设置destination_dataset_table 分区

在 Airflow 中将 Jinja 模板变量与 BigQueryOperator 结合使用

bigqueryoperator 是 write_truncate 1 个事务或 2 个事务