GCP - 从 PubSub 到 BigQuery 的消息

Posted

技术标签:

【中文标题】GCP - 从 PubSub 到 BigQuery 的消息【英文标题】:GCP - Message from PubSub to BigQuery 【发布时间】:2021-04-14 03:39:28 【问题描述】:

我需要从我的 pubsub 消息中获取数据并插入到 bigquery 中。

我有什么:

const topicName = "-----topic-name-----";
const data = JSON.stringify( foo: "bar" );

// Imports the Google Cloud client library
const  PubSub  = require("@google-cloud/pubsub");

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessageWithCustomAttributes() 
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  // Add two custom attributes, origin and username, to the message
  const customAttributes = 
    origin: "nodejs-sample",
    username: "gcp",
  ;

  const messageId = await pubSubClient
    .topic(topicName)
    .publish(dataBuffer, customAttributes);
  console.log(`Message $messageId published.`);


publishMessageWithCustomAttributes().catch(console.error);

我需要从此消息中获取数据/属性并在 BigQuery 中查询,有人可以帮助我吗?

提前谢谢!

【问题讨论】:

您必须处理的消息量是多少? 很多消息,发送这些参数用于调用 Facebook api,再次感谢 mon ami! C' est très gentil :) 【参考方案1】:

事实上,消费消息有两种解决方案:一条消息一条消息,或者批量消息。

首先,在详细介绍之前,由于您将执行 BigQuery 调用(或 Facebook API 调用),您将花费大量处理时间来等待 API 响应。


每条消息的消息 如果您有可接受的消息量,则可以对每个消息执行一条消息处理。您有 2 个解决方案:
    您可以使用 Cloud Functions 处理每条消息。为函数设置最小内存量 (128Mb) 以限制 CPU 成本,从而限制全局成本。确实,因为你会等待很多,所以不要花费昂贵的 CPU 成本无所事事!好的,当数据存在时,您会慢慢处理数据,但这是一种权衡。

创建Cloud Function on the topic,或推送订阅以调用HTTP triggered Cloud Functions

    您也可以处理request concurrently with Cloud Run。 Cloud Run 最多可以同时处理 250 个请求(预览版),因为您会等待很多,所以它非常适合。如果您需要更多 CPU 和内存,可以将这些值增加到 4CPU 和 8Gb 内存。 这是我的首选解决方案。

如果您能够轻松管理多 CPU 多(轻)线程开发,则可以进行批量处理。在 Go 中很容易。 Node 中的并发也很容易(等待/异步),但我不知道它是支持多 CPU 还是只有单 CPU。反正原理如下
    在 PubSub 主题上创建请求订阅 创建一个 Cloud Run(更适合多 CPU,但也可以与 App Engine 或 Cloud Functions 一起使用),它将侦听拉取订阅一段时间(比如说 10 分钟) 对于提取的每条消息,都会执行一个异步过程:获取数据/属性,调用 BigQuery,确认消息 pull connexion超时后,关闭消息监听,完成当前消息处理,优雅退出(返回200 HTTP码) 创建每 10 分钟调用一次 Cloud Run 服务的 Cloud Scheduler。将超时设置为 15 分钟并放弃重试。 部署 Cloud Run 服务,超时时间为 15 分钟。

此解决方案提供了更好的消息吞吐量处理(每个 Cloud Run 服务可以处理超过 250 条消息),但没有真正的优势,因为您受到 API 调用延迟的限制。


编辑 1

代码示例

// For pubsunb triggered function
exports.logMessageTopic = (message, context) => 
    console.log("Message Content")
    console.log(Buffer.from(message.data, 'base64').toString())
    console.log("Attribute list")
    for (let key in message.attributes) 
        console.log(key + " -> " + message.attributes[key]);
    ;
;


// For push subscription
exports.logMessagePush  = (req, res) => 
    console.log("Message Content")
    console.log(Buffer.from(req.body.message.data, 'base64').toString())
    console.log("Attribute list")
    for (let key in req.body.message.attributes) 
        console.log(key + " -> " + req.body.message.attributes[key]);
    ;
;

【讨论】:

C'est parfait 再次感谢您!你的回答令人难以置信!谢谢!你能帮我解释一下它自己的函数的语法和方法吗?很明显,我是编程新手,但是当涉及其他服务(pubsub、bigquery)时,我发现编写云函数特别困难 还记得最后一个问题吗?我将它发送给我的主管,他改变了方法,现在带有数据的服务将在 pubsub 的主题中发布一条消息,然后我们需要在 bigquery(数据/属性)中写入这条消息......感谢分享你的知识! 我添加了代码示例以从消息中获取内容 + 属性。然后,构建您的查询并将其提交给 BigQuery 非常感谢,我会尝试在项目中实现这个,看看我是否可以让它工作!非常感谢!

以上是关于GCP - 从 PubSub 到 BigQuery 的消息的主要内容,如果未能解决你的问题,请参考以下文章

在 Java 中将 protobuf 转换为 bigquery

从 pubsub->bigquery 移动到 pubsub->gcs (avro)->bigquery

如何从 Cloudflare 工作人员内部发布到 GCP PubSub 主题

从 Cloud Function 发布到 GCP PubSub 的正确方法是啥?

从 PubSub 导出到 BigQuery - Dataflow 没有任何反应

数据流:从 Pubsub RuntimeException 导出到 Bigquery