BigQueryOperator 可以运行多个 SQL 文件吗?
Posted
技术标签:
【中文标题】BigQueryOperator 可以运行多个 SQL 文件吗?【英文标题】:Can BigQueryOperator run multiple SQL files? 【发布时间】:2021-11-04 19:16:58 【问题描述】:我有一些 SQL 由气流 BigQueryOperator 触发,其中一个 SQL 对以下情况给出的所有任务都很常见:
要运行的 SQL - common.sql
、abc.sql
、xyz.sql
common.sql + abc.sql
任务2 - common.sql + xyz.sql
为了一个任务运行 2 个 SQL,我将 2 个 SQL 文件读入一个字符串,然后运行合并的字符串以一次性运行任务。
代码如下:
with open ('common.sql', "r") as sqlfile:
common_array = sqlfile.readlines()
with open ('abc.sql', "r") as sqlfile:
abc_array = sqlfile.readlines()
# at this point, sql_script has all codes from common.sql and abc.sql
sql_script = ''.join(common_array) + '\n' + ''.join(abc_array)
BigQueryOperator(task_id='task1', sql=sql_script)
这符合我的目的,还有其他更优雅的方式吗?
【问题讨论】:
嗨,OP。让我们知道答案是否有帮助。如果它回答了您的问题,请考虑接受它。这样其他人就知道你已经(充分地)得到了帮助。另见What should I do when someone answers my question? 【参考方案1】:您可以创建一个包含文件的列表,而不是手动逐个文件读取。如果您有多个文件,这将很有帮助。对于每个文件读取的内容,将其连接到common_array
,并为每个连接的公共和 sql 文件创建一个键值对。
用于测试 test1.sql、test2.sql、test3.sql 和 common.sql 包含单行查询。
import json
with open ('common.sql', "r") as sqlfile:
common_array = sqlfile.readlines()
file_arr = ['test1.sql','test2.sql','test3.sql']
sql_dict =
for data in file_arr:
with open (data, "r") as sqlfile:
key = data.split('.')[0]
value = ''.join(common_array) + '\n' + ''.join(sqlfile.read().rstrip())
sql_dict[key] = value
print(json.dumps(sql_dict, sort_keys=False, indent=2))
print("\nDictionary value for sql_dict['test1']: \n"+ sql_dict['test1'])
输出:
"test1": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable` LIMIT 10",
"test2": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable_2` LIMIT 10",
"test3": "SELECT * FROM `my-project.my_dataset.common` LIMIT 10\n\nSELECT * FROM `my-project.my_dataset.myTable_3` LIMIT 10"
Dictionary value for sql_dict['test1']:
SELECT * FROM `my-project.my_dataset.common` LIMIT 10
SELECT * FROM `my-project.my_dataset.myTable` LIMIT 10
然后您可以将字典中的值用于BigQueryOperator
。
BigQueryOperator(task_id='task1', sql=sql_dict['test1'])
【讨论】:
Ricco,感谢您回答问题。您的解决方案类似,也将脚本内容写入字符串,然后分配给 BigQueryOperator 参数。我期待分配一个文件路径之类的东西,但我们可以分配多个文件,而不是只分配一个文件。例如: BigQueryOperator(task_id='task1', sql=['common.sql', 'abc.sql']) (我试试这个,但它不起作用)。如果我可以分配文件路径,代码看起来会更简洁。 @BrianMo 是的,它非常相似,我的解决方案旨在删除多行加入 common 和 sql 文件。我检查了BigQueryOperator 的源代码,到目前为止它只能接受字符串,因此还不能传递 sql 文件列表。因此,您的解决方法是您目前唯一的选择。以上是关于BigQueryOperator 可以运行多个 SQL 文件吗?的主要内容,如果未能解决你的问题,请参考以下文章
可以使用气流 BigQueryOperator 制作 DDL bigquery 语句吗?
Airflow 的 bigqueryoperator 不能与 udf 一起使用
bigqueryoperator 气流上的 Bigquery 脚本
Apache Airflow - BigQueryOperator:如何动态设置destination_dataset_table 分区