集成 BigQuery SubPub 和 Cloud Functions
Posted
技术标签:
【中文标题】集成 BigQuery SubPub 和 Cloud Functions【英文标题】:Integrate BigQuery SubPub and Cloud Functions 【发布时间】:2021-04-11 09:45:01 【问题描述】:我在一个项目中,我们需要使用 BigQuery、PubSub、Logs explorer 和 Cloud Functions。
项目:
每次发生特定事件(例如用户接受 cookie)时,系统都会向 BigQuery 中插入一个新查询,其中包含许多列(参数),例如:utm_source、utm_medium、consent_cookies 等...
在我的表中有这个新查询后,我需要读取列并获取要在云函数中使用的值。
在云函数中,我想使用这些值进行 api 调用。
到目前为止我能做到的事情:
我创建了一个日志路由接收器,用于过滤新条目并将日志发送到我的 PubSub 主题。
我被困在哪里:
我想创建一个 Cloud 函数,该函数在每次有新日志进入时触发,并且在该函数中我想访问日志中包含的信息,例如 utm_source、utm_medium、consent_cookies 等...并使用进行 api 调用的值。
有人可以帮助我吗?非常感谢!
我做了一个项目来说明流程:
-
插入表格:
2.从此插入创建日志接收器:(过滤)
-
现在,每次我创建一个新查询时,它都会转到 PUB/SUB,我会获取该查询的日志
-
我想做的是触发一个关于这个主题的函数,并使用我在查询中的值来执行调用 api 等操作......
到目前为止,我能够编写此代码:
"use strict";
function main()
// Import the Google Cloud client library
const BigQuery = require("@google-cloud/bigquery");
async function queryDb()
const bigqueryClient = new BigQuery();
const sqlQuery = `SELECT * FROM \`mydatatable\``;
const options =
query: sqlQuery,
location: "europe-west3",
;
// Run the query
const [rows] = await bigqueryClient.query(options);
rows.forEach((row) =>
const username = row.user_name;
);
queryDb();
main();
现在我又被卡住了,我不知道如何从我创建的接收器中获取正确的查询并使用这些信息来拨打我的电话...
【问题讨论】:
(又是我 :) ) 你有日志中的数据详细信息吗?如果不是,为什么写入 BigQuery 的进程也没有使用正确的参数写入 PubSub? 嘿@guillaumeblaquiere 非常感谢您的时间和耐心!我添加了所有带有图像的步骤,以便更好地理解我正在尝试做的事情 好的,很清楚。我现在可以重现并提出您的一段代码。你喜欢什么语言(如果可以的话,我会用它做例子) 我习惯使用 node.js。太感谢了! Python 不是问题,哪一个最适合你的英雄!谢谢 @guillaumeblaquiere 我已经设法编写了一些代码,我是正确的道路吗?谢谢漂亮! 【参考方案1】:您有 2 个解决方案可以从 PubSub 消息中调用您的 Cloud Functions
HTTP 功能:您可以设置 HTTP 调用。在 trigger-http 中创建您的 Cloud Function,并在您的 PubSub 主题上创建一个推送订阅以调用 Cloud Functions。不要忘记添加安全性(将您的函数设为私有并在 PubSub 上启用安全性),因为您的函数可以公开访问 后台函数:您可以将 Cloud Functions 直接绑定到 PubSub 主题。订阅会自动创建并链接到 Cloud Functions。安全性是内置的。而且,因为您有 2 种类型的函数,所以您有 2 个不同的函数签名。我提供给你,处理是(相当)相同的。
function extractQuery(pubSubMessage)
// Decide base64 the PubSub message
let logData = Buffer.from(pubSubMessage, 'base64').toString();
// Convert it in JSON
let logMessage= JSON.parse(logData)
// Extract the query from the log entry
let query = logMessage.protoPayload.serviceData.jobInsertRequest.resource.jobConfiguration.query.query
console.log(query)
return query
// For HTTP functions
exports.bigqueryQueryInLog = (req, res) =>
console.log(req.body)
const query = extractQuery(req.body.message.data)
res.status(200).send(query);
// For Background functions
exports.bigqueryQueryInLogTopic = (message, context) =>
extractQuery(message.data)
;
记录的query
是您在日志条目中的insert into...
。然后,您必须解析您的 SQL 请求以提取您想要的部分。
【讨论】:
亲爱的@guillaumeblaquiere,多亏了你,我能够部署func,现在我可以访问数据了,可以从同一个func进行api调用吗?谢谢,你帮我说清楚了! 现在在 func 日志下我可以看到我需要的值在 textPayload 下。像这样: textPayload: "VALUES ("id","utm","cookies"," fbp ","2021-01-05",TRUE )",不是我的归档和 bigQuery 的值之间的密钥对。 ..有什么办法可以做到这一点?我的项目是使用字段对 fb 转换 Api 进行 api 调用,这就是为什么我需要这些值,如果它们采用这种格式,唯一的访问方法是通过正则表达式?再次感谢您的时间和耐心!我从你身上学到了很多东西!function facebookApiCall(req,res) var request = require('request'); var options = 'method': 'POST', 'url': 'https://graph.facebook.com/v9.0//events', 'headers': , formData: 'data': '[ "event_name": "postman", "event_time": 1610016563,"user_data": "em": "7b17fb0bd173f6616516451114dcss3d16fc78302d79f0fd30c2fc2fc068","ph": null, "custom_data": "currency": "EUR", "value": "142.52" ]', 'access_token': ''" ;request(options, function (error, response) if (error) throw new Error(error); return console.log(response.body); );
是的,它不是密钥对,它是 SQL 请求。您必须手动解析它们。我同意这不是很安全和高效。
我可以在同一个函数中解析它们吗?你能建议另一种方法吗?我对这个项目和 GCP 感到非常困惑,所以我真的不知道如何处理以上是关于集成 BigQuery SubPub 和 Cloud Functions的主要内容,如果未能解决你的问题,请参考以下文章
从 Google 脚本将数据插入 BigQuery:遇到“”
Google Sheets AddOn - 通过服务帐户集成 AppScript 和 BigQuery
Google BigQuery:通过 Python google-cloud-bigquery 版本 0.27.0 与 0.28.0 创建视图