如何在另一个任务气流中使用查询结果(bigquery 运算符)
Posted
技术标签:
【中文标题】如何在另一个任务气流中使用查询结果(bigquery 运算符)【英文标题】:How to use the result of a query (bigquery operator) in another task-airflow 【发布时间】:2021-08-19 17:47:54 【问题描述】:我在 google composer 中有一个项目,旨在每天提交。 下面的代码就是这样做的,效果很好。
with models.DAG('reporte_prueba',
schedule_interval=datetime.timedelta(weeks=4),
default_args=default_dag_args) as dag:
make_bq_dataset = bash_operator.BashOperator(
task_id='make_bq_dataset',
# Executing 'bq' command requires Google Cloud SDK which comes
# preinstalled in Cloud Composer.
bash_command='bq ls || bq mk '.format(
bq_dataset_name, bq_dataset_name))
bq_audit_query = bigquery_operator.BigQueryOperator(
task_id='bq_audit_query',
sql=query_sql,
use_legacy_sql=False,
destination_dataset_table=bq_destination_table_name)
export_audits_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(
task_id='export_audits_to_gcs',
source_project_dataset_table=bq_destination_table_name,
destination_cloud_storage_uris=[output_file],
export_format='CSV')
download_file = GCSToLocalFilesystemOperator(
task_id="download_file",
object_name='audits.csv',
bucket='bucket-reportes',
filename='/home/airflow/gcs/data/audits.csv',
)
email_summary = email_operator.EmailOperator(
task_id='email_summary',
to=['aa@bb.cl'],
subject="""Reporte de Auditorías Diarias
Institución: institution_report día date_report
""".format(date_report=date,institution_report=institution),
html_content="""
Sres.
<br>
Adjunto enviamos archivo con Reporte Transacciones Diarias.
<br>
""",
files=['/home/airflow/gcs/data/audits.csv'])
delete_bq_table = bash_operator.BashOperator(
task_id='delete_bq_table',
bash_command='bq rm -f %s' % bq_destination_table_name,
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
(
make_bq_dataset
>> bq_audit_query
>> export_audits_to_gcs
>> delete_bq_table
)
export_audits_to_gcs >> download_file >> email_summary
使用此代码,我创建了一个包含我需要发送的数据的表(稍后将被删除),然后我将该表作为 csv 传递到存储。 然后我将 .csv 下载到本地气流目录以通过邮件发送。
我的问题是,如果我可以避免创建表并将其存储的部分。因为我不需要它。
例如,使用 BigqueryOperator 执行查询,并在 ariflow 中访问结果,从而在本地生成 csv 然后发送。
我有办法生成 CSV,但我最大的疑问是如何(如果可能的话)访问查询结果或将结果传递给另一个气流任务
【问题讨论】:
这个相关帖子有很多答案,但根据您的用例,这个答案更适用,更简单***.com/a/66647164/14733669。 【参考方案1】:虽然我不建议跨任务传递 sql 查询的结果,但气流中的 XComs 通常用于任务之间的通信。
https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html
您还需要创建一个自定义运算符来返回查询结果,因为我“相信”BigQueryOperator 不会返回查询结果。
【讨论】:
为什么不建议在任务之间传递结果?还是我自己的意见?你能不能设计一个操作员一起完成这两个任务?以上是关于如何在另一个任务气流中使用查询结果(bigquery 运算符)的主要内容,如果未能解决你的问题,请参考以下文章