使用用户定义的函数在 BigQuery 数据集中插入海量数据时如何优化性能

Posted

技术标签:

【中文标题】使用用户定义的函数在 BigQuery 数据集中插入海量数据时如何优化性能【英文标题】:How can I optimize performance when inserting massive data in BigQuery dataset with a user-defined function 【发布时间】:2019-04-09 02:34:20 【问题描述】:

我只是在寻找最近使用 UDF 将大量数据添加到 BigQuery 表中的方法。所以,我尝试了这样的推荐方法:

#standardSQL
INSERT INTO `project.dataset.Quincy` (id, col)
WITH array_to_loop_through AS (
  SELECT id 
  FROM UNNEST(GENERATE_ARRAY(1, 1000, 1)) id
)
SELECT id, CONCAT('Rank: ', CAST(id AS STRING))
FROM array_to_loop_through

将 100 万个值添加到表中需要 8 秒。所以我在我的 UDF 上应用了这种方式:

CREATE TEMPORARY FUNCTION myFunc()
  RETURNS array<string>
  LANGUAGE js AS
"""
a=[""];
for(i=0;i<=50;i++)
    a.push(randomString(12));
    
    return a;
"""
OPTIONS (
library="gs://kaneki110299/tester.js"

);



#standardSQL

INSERT INTO `Lambert.fortune` (password)
WITH array_to_loop_through AS (
  SELECT * 
  FROM UNNEST(myFunc()) id
)
SELECT CONCAT(CAST(id AS STRING))
FROM array_to_loop_through

当我在 BigQuery 上运行此查询时,它会运行 5 分钟,然后遇到只有 50 个值的 UDF 超时。当我将循环放入tester.js 时发生了同样的错误。所以,我尝试了另一种方法:

CREATE TEMPORARY FUNCTION myFunc()
  RETURNS string
  LANGUAGE js AS
"""   
    return randomString(12);
"""
OPTIONS (
library="gs://kaneki110299/tester.js"

);



#standardSQL

INSERT INTO `Lambert.fortune` (password) 
Values (myFunc()),(myFunc()),(myFunc())...//1000 times

与上一个查询不同,这个查询只需 30 秒即可将我的 UDF 结果中的 1000 个值添加到表中。看起来循环在 BigQuery 上运行不佳或运行不快。

在运行用户定义的函数以将大量数据插入其数据集时,是否可以使用并行或 BigQuery 支持来优化其 CPU 性能?我试图在表上添加 10 亿个值,所以我使用的最后一种方法似乎不实用。

【问题讨论】:

Bigquery 运行一切都是并行的,这就是为什么它是一个如此惊人的大数据解决方案。您应该检查位于 webui 的详细信息选项卡中的执行报告,以了解执行需要时间的原因。如果您可以提供屏幕截图,则可以更轻松地解决您的问题。 我不知道该放什么截图。您只需复制查询,在查询编辑器上运行它并返回结果,我使用的 randomString 函数来自这个 ***.com/questions/1349404/… 。我希望如果我可以将 bigquery 默认函数与我的 UDF 结合起来生成大数组,然后插入到像“GENERATE_ARRAY(1, 1000, 1)”这样的表中 观看此视频youtube.com/watch?v=UueWySREWvk @TamirKlein 这有点不可能。我可以使用 node.js 中的 require() 来调用我的 js 文件中的外部库吗? BigQuery 允许这样做吗? 【参考方案1】:

使用googleApi npm,您可以编写一个 JS 程序,该程序将并行运行多个插入。

这是关于如何使用 API 进行 1 次调用的完整工作 mocha 测试。 您可以使用自己的 for loop 包装内部调用,以并行完成插入。

if (!global._babelPolyfill) 
    var a = require("babel-polyfill")


import google from 'googleapis'

let bigQuery = google.bigquery("v2")

describe('Run query with API', async () => 

    it('Run a query', async () => 
        let result = await test('panada')

    )

    async function test(p1) 
        try 
            let query = `INSERT INTO \`project.dataset.Quincy\` (id, col)
                        WITH array_to_loop_through AS (
                          SELECT id 
                          FROM UNNEST(GENERATE_ARRAY(1, 10, 1)) id
                        )
                        SELECT id, CONCAT('Rank: ', CAST(id AS STRING))
                        FROM array_to_loop_through`

            let auth = getBasicAuthObj()
            auth.setCredentials(
                access_token: "myAccessToken",
                refresh_token: "myRefreshToken"
            )

            let request = 
                "projectId": "myProject",
                auth,
                "resource": 
                    "projectId": "myProject",
                    "configuration": 
                        "query": 
                            query,
                            "useLegacySql": false
                        ,
                        "dryRun": false
                    
                
            

            console.log(`query is: $query`)

            let result = await callBQ(request)

            // debugger
            console.log(`Status is: $result.data.status.state`)
         catch (err) 
            console.log("err", err)
        
    

    /**
     * Call BigQuery jobs.insert
     * @param request
     * @returns Promise<*>
     */
    async function callBQ(request) 
        debugger
        // console.log("request", request)
        try 
            let result = await bigQuery.jobs.insert(request, request)
            console.log(`All good.....`)

            return result
         catch (e) 
            console.log(`Failed to run query: $e`)
        

    

    /**
     * Create oAuth object
     * @returns OAuth2Client
     */
    function getBasicAuthObj() 
        let clientId = 'myclientId'
        let clientSecret = 'mySecret'
        let redirectUrl = 'URL'

        return new google.auth.OAuth2(
            clientId,
            clientSecret,
            redirectUrl
        )
    
)

注意:Bigquery 在并行插入和运行查询时有限制,请参阅link 了解更多详细信息

【讨论】:

嗨@Toan 这对您有任何帮助吗? 我必须在外部程序中运行此代码吗?因为我只是把它放在我的 js 文件中并且出现错误,而且 bigquery 也不允许我在我的文件中使用 require() 来链接到外部库。另外,这个代码比我的代码更快吗? 是的,我们的想法是在 bigquery 之外运行它,并使用 js async 和 promiseAll 功能添加您自己的并行层。因此,例如,您可以让 10 个 Promise 并行运行,所有在您的目标表中为不同的 id 范围集创建记录。 @ToanNguyen 能否请您也投票赞成这个答案。这也将帮助您提高声誉,请参阅此链接如何做到这一点:***.com/help/someone-answers

以上是关于使用用户定义的函数在 BigQuery 数据集中插入海量数据时如何优化性能的主要内容,如果未能解决你的问题,请参考以下文章

如何创建使用用户定义函数的 BigQuery 视图?

使用 SQL 查询从 BigQuery 用户定义函数返回值

BigQuery 是不是支持分析用户定义的函数?

BigQuery 用户定义函数中的 Base64 编码

使用 python 和 BigQuery API 获取 BigQuery 数据集中的表列表

Python中的条件数据插补