如何使用调用 UDF 的 Python 脚本来使用 BigQuery API

Posted

技术标签:

【中文标题】如何使用调用 UDF 的 Python 脚本来使用 BigQuery API【英文标题】:How to use the BigQuery API using a Python script calling a UDF 【发布时间】:2017-05-10 14:35:19 【问题描述】:

针对 BigQuery 表,我正在尝试运行一个调用 UDF 的 SQL 语句。此语句在 Python 脚本中执行,并通过 BigQuery API 进行调用。

当我执行一个没有 UDF 的简单 SQL 语句时,它工作正常。但是,当我尝试使用 UDF 脚本(存储在本地或 GCS 存储桶中)时,我不断收到相同的错误。 这是我在本地终端上得到的(我通过 Python Launcher 运行脚本):

Traceback(最近一次调用最后一次): 文件“/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/googleapiclient/http.py”, 第 840 行,执行中 引发 HttpError(resp, content, uri=self.uri) googleapiclient.errors.HttpError: https://www.googleapis.com/bigquery/v2/projects/[projectId]/queries?alt=json 返回“缺少必填参数”>

这是我的 Python 脚本:

credentials = SignedJwtAssertionCredentials(
SERVICE_ACCOUNT_EMAIL,
key,
scope='https://www.googleapis.com/auth/bigquery')

aservice = build('bigquery','v2',credentials=credentials)
query_requestb = aservice.jobs()

query_data = 
    'configuration': 
        'query': 
            'userDefinedFunctionResources': [
                
                   'resourceUri': 'gs://[bucketName]/[fileName].js'
                
            ],
            'query': sql
        
    ,
    'timeoutMs': 100000


query_response = query_requestb.query(projectId=PROJECT_NUMBER,body=query_data).execute(num_retries=0)

知道“缺少什么参数”或如何让它运行吗?

【问题讨论】:

【参考方案1】:

不要指定userDefinedFunctionResources,而是在'query' 的正文中使用CREATE TEMP FUNCTION,并将库作为OPTIONS 子句的一部分引用。为此,您需要使用standard SQL,也可以参考user-defined functions 上的文档。您的查询将如下所示:

#standardSQL
CREATE TEMP FUNCTION MyJsFunction(x FLOAT64) RETURNS FLOAT64 LANGUAGE js AS """
  return my_js_function(x);
"""
OPTIONS (library='gs://[bucketName]/[fileName].js');

SELECT MyJsFunction(x)
FROM MyTable;

【讨论】:

感谢 Elliott 的快速回复。使用standard SQL 让它工作,并通过OPTIONS 子句在两个单独的UDF 文件中调用我的函数。请注意,我必须在 JS 函数的 IF 语句中小写我的if,因为它是区分大小写的(与使用带有legacy SQL 的UDF 相反)。此解决方案有效。但是仍然很想了解我们如何将它与legacy SQL 一起使用。【参考方案2】:

我想要运行的查询是按我通常使用 UDF 的营销渠道对流量和销售进行分类。这是我使用standard SQL 运行的查询。此查询存储在我读取并存储在变量sql 的文件中:

CREATE TEMPORARY FUNCTION
  mktchannels(source STRING,
    medium STRING,
    campaign STRING)
  RETURNS STRING
  LANGUAGE js AS """
return channelGrouping(source,medium,campaign) // where channelGrouping is the function in my channelgrouping.js file which contains the attribution rules
  """ OPTIONS ( library=["gs://[bucket]/[path]/regex.js",
    "gs://[bucket]/[path]/channelgrouping.js"] );
WITH
  traffic AS ( // select fields from the BigQuery table
  SELECT
    device.deviceCategory AS device,
    trafficSource.source AS source,
    trafficSource.medium AS medium,
    trafficSource.campaign AS campaign,
    SUM(totals.visits) AS sessions,
    SUM(totals.transactionRevenue)/1e6 as revenue,
    SUM(totals.transactions) as transactions
  FROM
    `[datasetId].[table]`
  GROUP BY
    device,
    source,
    medium,
    campaign)
SELECT
  mktchannels(source,
    medium,
    campaign) AS channel, // call the temp function set above
  device,
  SUM(sessions) AS sessions,
  SUM(transactions) as transactions,
  ROUND(SUM(revenue),2) as revenue
FROM
  traffic
GROUP BY
  device,
  channel
ORDER BY
  channel,
  device;

然后在 Python 脚本中:

fd = file('myquery.sql', 'r')
sql = fd.read()
fd.close()

query_data = 
    'query': sql,
    'maximumBillingTier': 10,
    'useLegacySql': False,
    'timeoutMs': 300000

希望这对将来的任何人都有帮助!

【讨论】:

以上是关于如何使用调用 UDF 的 Python 脚本来使用 BigQuery API的主要内容,如果未能解决你的问题,请参考以下文章

如何在spark中使用transform python udf执行hql脚本?

如何从 Pig 中的 Python UDF 打印?

UDF 可以访问调用 Pig 脚本中声明的参数或定义的值吗?

如何使用 Java 调用 Groovy 或 Scala UDF 来更新 Oracle?

检查python写的hive udf中的错误

如何在 PHP PDO 中使用 UDF