Airflow BigQuery Hook - 通过 run_query 运行更新查询
Posted
技术标签:
【中文标题】Airflow BigQuery Hook - 通过 run_query 运行更新查询【英文标题】:Airflow BigQuery Hook - Run update query via run_query 【发布时间】:2021-01-05 04:33:58 【问题描述】:我想在 DAG 中完成一堆任务后运行一些更新查询。因此,我将创建一个 python 函数,用于从之前的任务输出中获取必要的详细信息,然后使用 run_query
我想更新一个表。
这是我的python函数。
def metatable_update(dag,tablename,schedule,**kwargs):
bq_hook = BigQueryHook(bigquery_conn_id='bigquery_default',location='europe-west3',use_legacy_sql=False)
pg_conn = config[schedule][tablename][1]
ti = kwargs['ti']
pgmax_ts = str(ti.xcom_pull(task_ids='get_maxts__'.format(tablename,pg_conn))[1])
bq_hook.run_query(sql="update bqadmin.tablesync_meta set max_value='' where tablename=''".format(pgmax_ts,tablename))
我的问题只是简单地使用此代码,还是需要添加 execute
或 return
之类的内容才能成功运行此任务?
【问题讨论】:
【参考方案1】:没关系,我已经使用 BQ hook 本身来解决这个问题
并且不需要提及执行或返回。
def metatable_update(dag,tablename,schedule,**kwargs):
bq_hook = BigQueryHook(bigquery_conn_id='bigquery_default',location='europe-west3',use_legacy_sql=False)
pg_conn = config[schedule][tablename][1]
ti = kwargs['ti']
pgmax_ts = str(ti.xcom_pull(task_ids='get_maxts__'.format(tablename,pg_conn))[1])
bq_hook.run(sql="update bqadmin.tablesync_meta set max_value='' where tablename='' and datasource_dbconn=''".format(pgmax_ts,tablename,pg_conn))
return 'Executed the update query'
【讨论】:
以上是关于Airflow BigQuery Hook - 通过 run_query 运行更新查询的主要内容,如果未能解决你的问题,请参考以下文章
Airflow - BigQuery 作业状态检查失败。最终错误是:%s'
为啥我应该使用 Airflow Hook 而不是 Python SDK for DataStore?
Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复运行