Airflow 的 bigqueryoperator 不能与 udf 一起使用

Posted

技术标签:

【中文标题】Airflow 的 bigqueryoperator 不能与 udf 一起使用【英文标题】:Airflow's bigqueryoperator not working with udf 【发布时间】:2018-05-25 11:01:53 【问题描述】:

我正在尝试在使用用户定义函数 (UDF) 的 Airflow(使用 Google 的 Composer)任务中运行基本的 bigquery 运算符。

示例来自https://cloud.google.com/bigquery/docs/reference/standard-sql/user-defined-functions,在 BigQuery 中完美运行。

但是,当我上传到作曲家时,我得到“未找到函数:multiplyInputs...”请参阅下面的 python 脚本。

BigQueryOperator 的 udf_config 字段需要一个列表,因此我将我的 UDF 定义为一个包含一个字符串的列表 - 不确定这是否正确,因为它显然没有注册为 UDF

任何帮助将不胜感激。

import datetime
from airflow import models
from airflow.contrib.operators import bigquery_operator

yesterday = datetime.datetime.combine(datetime.datetime.today() -             
datetime.timedelta(1),
                                  datetime.datetime.min.time())
default_dag_args = 
                # Setting start date as yesterday starts the DAG 
immediately when it is
                # detected in the Cloud Storage bucket.
                'start_date': yesterday,
                # To email on failure or retry set 'email' arg to your 
email and enable
                # emailing here.
                'email_on_failure': False,
                'email_on_retry': False,
                # If a task fails, retry it once after waiting at least 
5 minutes
                'retries': 1,
                'retry_delay': datetime.timedelta(minutes=5),
                'project_id': 'vital-platform-791'


with models.DAG('udf_example',
                schedule_interval=datetime.timedelta(days=1),
                default_args=default_dag_args) as dag:

    table = 'udf_table'

    # flatten fe table
    task_id = table + '_fe'

    udf_config = ["""CREATE TEMPORARY FUNCTION multiplyInputs(x 
                  FLOAT64, y FLOAT64)
                  RETURNS FLOAT64
                  LANGUAGE js AS \"""
                  return x*y;
                  \""";
                  """]

    print udf_config

    query = """WITH numbers AS
              (SELECT 1 AS x, 5 as y
              UNION ALL
              SELECT 2 AS x, 10 as y
              UNION ALL
              SELECT 3 as x, 15 as y)
            SELECT x, y, multiplyInputs(x, y) as product
            FROM numbers"""

    print query

    query = query
    destination_table = 'vital-platform-791.alpha_factors. 
                          table_fe'.format(table=table)

    t_fe = bigquery_operator.BigQueryOperator(task_id=task_id,
                                          bql=query,                                         
                     destination_dataset_table=destination_table,
                                          use_legacy_sql=False,                                              
                     write_disposition='WRITE_TRUNCATE',
                                          udf_config=udf_config)

【问题讨论】:

【参考方案1】:

我对这个例子有点困惑。看来您只需要将udf_configquery 合并即可:

query = ""CREATE TEMPORARY FUNCTION multiplyInputs(x 
              FLOAT64, y FLOAT64)
              RETURNS FLOAT64
              LANGUAGE js AS \"""
              return x*y;
              \""";
              WITH numbers AS
          (SELECT 1 AS x, 5 as y
          UNION ALL
          SELECT 2 AS x, 10 as y
          UNION ALL
          SELECT 3 as x, 15 as y)
        SELECT x, y, multiplyInputs(x, y) as product
        FROM numbers;"""

【讨论】:

谢谢,这很好用。 BigQueryOperator 的显式“udf_config”字段在某种程度上误导了我,因为我认为在使用 UDF 时必须填充它。 我认为问题是针对标准 SQL 的,因此应该接受这个答案而不是我的答案。干得好@ElliottBrossard【参考方案2】:

在 Google Cloud Storage 中上传您的 UDF 函数并将其传递给 udf_config 参数。

例如:

您的 UDF 函数位于 gs://test-bucket/testfolder/udf.js

然后在你的气流中使用:

udf_gcs_path = "gs://test-bucket/testfolder/udf.js"

bigquery_operator.BigQueryOperator(task_id=task_id,
    bql=query,                                         
    destination_dataset_table=destination_table,
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    udf_config=["resourceUri": udf_gcs_path])

参考文献

https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.userDefinedFunctionResources https://cloud.google.com/bigquery/user-defined-functions#referencing-code-from-google-cloud-storage

【讨论】:

谢谢,看看文档,这似乎是遗留 sql 的方法,而不是标准 sql? cloud.google.com/bigquery/docs/reference/standard-sql/…

以上是关于Airflow 的 bigqueryoperator 不能与 udf 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

airflow并发慢

airflow的使用

Airflow 中文文档:使用systemd运行Airflow

2.airflow参数简介

Airflow + Nginx 设置给出了 Airflow 404 = 很多圈子

airflow系列教程 airflow的报警功能设置