从 Pubsub 在 BigQuery 中编写查询

Posted

技术标签:

【中文标题】从 Pubsub 在 BigQuery 中编写查询【英文标题】:Write query in BigQuery from Pubsub 【发布时间】:2021-04-16 11:32:18 【问题描述】:

需要帮助。

我正在接收带有 PubSub 主题中数据的消息,我需要插入从消息中获取的数据并使用后台云功能 (PUB/SUB) 在 BigQuery 中查询...

我想做什么:

/**
 * Triggered from a message on a Cloud Pub/Sub topic.
 *
 * @param !Object event Event payload.
 * @param !Object context Metadata for the event.
 */
exports.insertBigQuery = (message, context) => 
    extractQuery(message.data);
   
;

function extractQuery(pubSubMessage)
    // Decide base64 the PubSub message
    let logData = Buffer.from(pubSubMessage, 'base64').toString();
    // Convert it in JSON
    let logMessage= JSON.parse(logData);

    console.log(logMessage.customerToken)
    console.log(logMessage.fbclid)
    console.log(logMessage.fbc)
    console.log(logMessage.fbp)
    console.log(logMessage.firstHitTS)
    console.log(logMessage.consentFB)

    main();
    
    return logMessage

    

"use strict";

function main() 
  const  BigQuery  = require("@google-cloud/bigquery");
  const bigquery = new BigQuery();

  async function query() 
    const query = `INSERT INTO MYTABLE( customerToken, fbclid, fbc, fbp, firstHitTS, consentFB)
    VALUES ("customerTokenSCRIPTCLOUD","fbclidSCRIPT"," fbcSCRIPTCLOUD"," fbpSCRIPTCLOUD","2021-01-05",TRUE )`;

    const options = 
      query: query,
      location: "US",
    ;

    const [job] = await bigquery.createQueryJob(options);
    console.log(`Job $job.id started.`);

    const [rows] = await job.getQueryResults();

    console.log("Rows:");
    rows.forEach((row) => console.log(row));
  

  query();

现在每次收到消息时,我都会在 bigQuery 中进行查询,但我的 VALUES 是硬编码的,如您在此处看到的:

const query = `INSERT INTO devsensetestprojects.TestDataSet.fbSimpleData( customerToken, fbclid, fbc, fbp, firstHitTS, consentFB)
    VALUES ("customerTokenSCRIPTCLOUD","fbclidSCRIPT"," fbcSCRIPTCLOUD"," fbpSCRIPTCLOUD","2021-01-05",TRUE )`;

我不能做的是从 function extractQuery(pubSubMessage) 获取值并在查询中使用它们,就像在函数 (logMessage.SOMEVALUE) 中使用它们一样,以获得我需要的正确值。

提前致谢!

【问题讨论】:

【参考方案1】:

正如您所说,您是开发的初学者。这里有一个更简洁高效的代码。我没有测试它,但它更接近你想要的。让我知道有些部分对你来说是神秘的!


// Make them global to load them only when the Cloud Function instance is created
// They will be reused in the subsequent processing and until the instance deletion
const  BigQuery  = require("@google-cloud/bigquery");
const bigquery = new BigQuery();



exports.insertBigQuery = async (message, context) => 

    // Decode base64 the PubSub message
    let logData = Buffer.from(message.data, 'base64').toString();
    // Convert it in JSON
    let logMessage= JSON.parse(logData);

    const query = createQuery(logMessage)

    const options = 
        query: query,
        location: "US",
    ;

    const [job] = await bigquery.createQueryJob(options);
    console.log(`Job $job.id started.`);

    // Only wait the end of the job. Theere is no row as answer, it's only an insert
    await job.getQueryResults();



function createQuery(logMessage) 
    // You maybe have to format correctly the logMessage.firstHitTS to be accepted by BigQuery as a date.
    return `INSERT INTO MYTABLE(customerToken, fbclid, fbc, fbp, firstHitTS, consentFB)
                   VALUES (logMessage.customerToken, logMessage.fbclid, logMessage.fbc, logMessage.fbp,
                           logMessage.firstHitTS, logMessage.consentFB)`;

【讨论】:

“错误:无法识别的名称:logMessage 在 [2:15] 在新 ApiError (/workspace/node_modules/@google-cloud/common/build/src/util.js:59:15)”我不知道为什么会这样,看来 logMessage 仍然没有通过。谢谢美人️ 看来问题在于INSERT INTO MYTABLE(customerToken, fbclid, fbc, fbp, firstHitTS, consentFB) VALUES (logMessage.customerToken, logMessage.fbclid, logMessage.fbc, logMessage.fbp, logMessage.firstHitTS, logMessage.consentFB)那部分中的每一件事都是一个字符串,我需要使用变量,也许是语法问题 亲爱的@guillaume blaquiere,您的功能非常完美!我只需要更改function createQuery(logMessage) 就像一个魅力!非常感谢! 你做了什么改变来修复它? NodeJS 是我较弱的语言,我看不出有什么问题! 我只是在查询字符串queryString=“INSERT INTO \”MYTABLE\”(abc, def, ghi) VALUES ( @abc, @def, @ghi);中更改了获取变量的方式,最后我得到了类似的东西,现在我在我的手机里,明天我会评论孔块! Merci Maître 编辑:代码本身没有问题,功能很完美,它是 sql 语法。

以上是关于从 Pubsub 在 BigQuery 中编写查询的主要内容,如果未能解决你的问题,请参考以下文章

GCP - 从 PubSub 到 BigQuery 的消息

从 PubSub 导出到 BigQuery - Dataflow 没有任何反应

数据流:从 Pubsub RuntimeException 导出到 Bigquery

使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出

PubSub 到 BigQuery - Python 中的数据流/Beam 模板?

BigQuery 不接受来自 protobuf 的二进制数据