GCP - 从 PubSub 到 BigQuery 的消息
Posted
技术标签:
【中文标题】GCP - 从 PubSub 到 BigQuery 的消息【英文标题】:GCP - Message from PubSub to BigQuery 【发布时间】:2021-04-14 03:39:28 【问题描述】:我需要从我的 pubsub 消息中获取数据并插入到 bigquery 中。
我有什么:
const topicName = "-----topic-name-----";
const data = JSON.stringify( foo: "bar" );
// Imports the Google Cloud client library
const PubSub = require("@google-cloud/pubsub");
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishMessageWithCustomAttributes()
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Add two custom attributes, origin and username, to the message
const customAttributes =
origin: "nodejs-sample",
username: "gcp",
;
const messageId = await pubSubClient
.topic(topicName)
.publish(dataBuffer, customAttributes);
console.log(`Message $messageId published.`);
publishMessageWithCustomAttributes().catch(console.error);
我需要从此消息中获取数据/属性并在 BigQuery 中查询,有人可以帮助我吗?
提前谢谢!
【问题讨论】:
您必须处理的消息量是多少? 很多消息,发送这些参数用于调用 Facebook api,再次感谢 mon ami! C' est très gentil :) 【参考方案1】:事实上,消费消息有两种解决方案:一条消息一条消息,或者批量消息。
首先,在详细介绍之前,由于您将执行 BigQuery 调用(或 Facebook API 调用),您将花费大量处理时间来等待 API 响应。
每条消息的消息 如果您有可接受的消息量,则可以对每个消息执行一条消息处理。您有 2 个解决方案:
-
您可以使用 Cloud Functions 处理每条消息。为函数设置最小内存量 (128Mb) 以限制 CPU 成本,从而限制全局成本。确实,因为你会等待很多,所以不要花费昂贵的 CPU 成本无所事事!好的,当数据存在时,您会慢慢处理数据,但这是一种权衡。
创建Cloud Function on the topic,或推送订阅以调用HTTP triggered Cloud Functions
-
您也可以处理request concurrently with Cloud Run。 Cloud Run 最多可以同时处理 250 个请求(预览版),因为您会等待很多,所以它非常适合。如果您需要更多 CPU 和内存,可以将这些值增加到 4CPU 和 8Gb 内存。 这是我的首选解决方案。
如果您能够轻松管理多 CPU 多(轻)线程开发,则可以进行批量处理。在 Go 中很容易。 Node 中的并发也很容易(等待/异步),但我不知道它是支持多 CPU 还是只有单 CPU。反正原理如下
-
在 PubSub 主题上创建请求订阅
创建一个 Cloud Run(更适合多 CPU,但也可以与 App Engine 或 Cloud Functions 一起使用),它将侦听拉取订阅一段时间(比如说 10 分钟)
对于提取的每条消息,都会执行一个异步过程:获取数据/属性,调用 BigQuery,确认消息
pull connexion超时后,关闭消息监听,完成当前消息处理,优雅退出(返回200 HTTP码)
创建每 10 分钟调用一次 Cloud Run 服务的 Cloud Scheduler。将超时设置为 15 分钟并放弃重试。
部署 Cloud Run 服务,超时时间为 15 分钟。
此解决方案提供了更好的消息吞吐量处理(每个 Cloud Run 服务可以处理超过 250 条消息),但没有真正的优势,因为您受到 API 调用延迟的限制。
编辑 1
代码示例
// For pubsunb triggered function
exports.logMessageTopic = (message, context) =>
console.log("Message Content")
console.log(Buffer.from(message.data, 'base64').toString())
console.log("Attribute list")
for (let key in message.attributes)
console.log(key + " -> " + message.attributes[key]);
;
;
// For push subscription
exports.logMessagePush = (req, res) =>
console.log("Message Content")
console.log(Buffer.from(req.body.message.data, 'base64').toString())
console.log("Attribute list")
for (let key in req.body.message.attributes)
console.log(key + " -> " + req.body.message.attributes[key]);
;
;
【讨论】:
C'est parfait 再次感谢您!你的回答令人难以置信!谢谢!你能帮我解释一下它自己的函数的语法和方法吗?很明显,我是编程新手,但是当涉及其他服务(pubsub、bigquery)时,我发现编写云函数特别困难 还记得最后一个问题吗?我将它发送给我的主管,他改变了方法,现在带有数据的服务将在 pubsub 的主题中发布一条消息,然后我们需要在 bigquery(数据/属性)中写入这条消息......感谢分享你的知识! 我添加了代码示例以从消息中获取内容 + 属性。然后,构建您的查询并将其提交给 BigQuery 非常感谢,我会尝试在项目中实现这个,看看我是否可以让它工作!非常感谢!以上是关于GCP - 从 PubSub 到 BigQuery 的消息的主要内容,如果未能解决你的问题,请参考以下文章
在 Java 中将 protobuf 转换为 bigquery
从 pubsub->bigquery 移动到 pubsub->gcs (avro)->bigquery
如何从 Cloudflare 工作人员内部发布到 GCP PubSub 主题
从 Cloud Function 发布到 GCP PubSub 的正确方法是啥?