Airflow BigQuery Hook - 通过 run_query 运行更新查询

Posted

技术标签:

【中文标题】Airflow BigQuery Hook - 通过 run_query 运行更新查询【英文标题】:Airflow BigQuery Hook - Run update query via run_query 【发布时间】:2021-01-05 04:33:58 【问题描述】:

我想在 DAG 中完成一堆任务后运行一些更新查询。因此,我将创建一个 python 函数,用于从之前的任务输出中获取必要的详细信息,然后使用 run_query 我想更新一个表。

这是我的python函数。

def metatable_update(dag,tablename,schedule,**kwargs):
    bq_hook = BigQueryHook(bigquery_conn_id='bigquery_default',location='europe-west3',use_legacy_sql=False)
    pg_conn = config[schedule][tablename][1]
    ti = kwargs['ti']
    pgmax_ts = str(ti.xcom_pull(task_ids='get_maxts__'.format(tablename,pg_conn))[1])
    bq_hook.run_query(sql="update bqadmin.tablesync_meta set max_value='' where tablename=''".format(pgmax_ts,tablename))
   

我的问题只是简单地使用此代码,还是需要添加 executereturn 之类的内容才能成功运行此任务?

【问题讨论】:

【参考方案1】:

没关系,我已经使用 BQ hook 本身来解决这个问题

并且不需要提及执行或返回。

def metatable_update(dag,tablename,schedule,**kwargs):
    bq_hook = BigQueryHook(bigquery_conn_id='bigquery_default',location='europe-west3',use_legacy_sql=False)
    pg_conn = config[schedule][tablename][1]
    ti = kwargs['ti']
    pgmax_ts = str(ti.xcom_pull(task_ids='get_maxts__'.format(tablename,pg_conn))[1])
    bq_hook.run(sql="update bqadmin.tablesync_meta set max_value='' where tablename='' and datasource_dbconn=''".format(pgmax_ts,tablename,pg_conn))
    return 'Executed the update query'

【讨论】:

以上是关于Airflow BigQuery Hook - 通过 run_query 运行更新查询的主要内容,如果未能解决你的问题,请参考以下文章

Airflow - BigQuery 作业状态检查失败。最终错误是:%s'

为啥我应该使用 Airflow Hook 而不是 Python SDK for DataStore?

Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复运行

检查 Airflow 中是不是存在 Bigquery 分区

将 Airflow(版本 1.10.5)与 Bigquery 连接

使用 Airflow 将 Bigquery 查询结果发送到数据框