GCP 云功能未正确接收/确认 PubSub 消息

Posted

技术标签:

【中文标题】GCP 云功能未正确接收/确认 PubSub 消息【英文标题】:GCP Cloud Function not correctly picking up/acknowledging PubSub messages 【发布时间】:2020-12-06 21:25:52 【问题描述】:

我在 Google Cloud Platform 中设置了一些数据处理工作流程。这些位置处理物理地址并返回一些关于它们的指标。工作流使用 Cloud Functions 和 PubSub 流的组合。

在工作流中使用一个 Google Cloud 函数时,某些消息不会从触发流中提取或被多次提取。我知道这在一定程度上是可以预料的。但是,这种情况经常发生。这足以导致某些地点被夸大了 10 倍,而其他一些地点却没有结果。

我认为callback 函数没有正确确认消息,但我不确定应该有什么不同才能更可靠地接收和确认消息。任何建议表示赞赏。

用于检索指标的我的 GCP 云函数由 PubSub 流触发,并执行 retrieve_location 函数将数据发送到不同的 PubSub 流。 retrieve_location 函数如下所示:

def retrieve_location(event, context):
    auth_flow()

    project_id = <my project id>
    subscription_name = <my subscription name>

    subscriber = pubsub_v1.SubscriberClient()

    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        message.ack()
        message_obj = message.data
        message_dcde = message_obj.decode('utf-8')
        message_json = json.loads(message_dcde)

        get_metrics(message_json)


    subscriber.subscribe(subscription_path, callback=callback)

get_metrics 函数从每条消息中获取有效负载,检索一些数据并将其发送到另一个流。此功能似乎按预期工作。

def get_metrics(loc):
    <... retrieve and process data, my_data is the object that gets sent to the next stream ...>
          project_id = <my project id>
          topic_name = <my topic name>
          topic_id = <my topic id>

          publisher = pubsub_v1.PublisherClient()
          topic_path = publisher.topic_path(project_id, topic_name)

            try:
                publisher.publish(topic_path, data=my_data.encode('utf-8'))
            except Exception as exc:
                    print("topic publish failed: ", exc)

【问题讨论】:

那么您的 Cloud Function 是否由 Pub/Sub 消息触发,导致它在不同的 Pub/Sub 订阅上启动订阅者?以这种方式启动 Pub/Sub 订阅者有点不寻常。 @KamalAboul-Hosn 每个 PubSub 流的订阅和主题 ID 都是唯一的,因此不应该交叉。但是开始订阅的最佳方式是什么? 我同意这是一个不寻常的模式。您能概括地解释一下为什么需要这样做吗? @DustinIngram 从 PubSub 流中触发函数以便拾取和处理所有消息的最佳方法是什么?我正在处理每个地址并检索数据。 只需将函数订阅到主题就足够了。我仍然不清楚为什么您需要在函数中添加第二个订阅者? 【参考方案1】:

您似乎将使用 Cloud Pub/Sub 触发 Cloud Function 与直接通过 Cloud Pub/Sub 客户端库使用 Pub/Sub 混为一谈。一般来说,你会想要做一个或另一个。

如果您创建的订阅是通过 Cloud Functions 完成的,那么您的 retrieve_location 函数并没有真正接收和处理消息。相反,它正在做的是启动一个订阅客户端,然后很快就会关闭,因为subscriber.subscribe 将运行到完成,因此您的函数将完成执行。

如果此函数正在启动一个触发 Cloud Function 的同一订阅的客户端,那么它实际上不会做任何事情,因为基于 Cloud-Function 的订阅使用 push 模型,而客户端库应该是与pull 模型一起使用。

您要么想直接在retrieve_location 中执行callback 中的操作,使用事件作为消息(如Dustin 所述),要么您希望使用客户端库设置一个持久订阅者,例如,在 GCE 上,实例化订阅者并在其上调用 subscribe

【讨论】:

这个细节也非常有助于解决问题。感谢您提供此见解!【参考方案2】:

您应该创建一个background function 来订阅直接处理有效负载的主题,而不是在您的 Cloud Function 中设置第二个 Pub/Sub 订阅者,例如:

def get_metrics_background_function(event, context):
    message_obj = event.data
    message_dcde = message_obj.decode('utf-8')
    message_json = json.loads(message_dcde)

    get_metrics(message_json)

【讨论】:

我正在努力做出您建议的更改。在此示例中,没有 message.ack()。不需要吗? 没必要,Cloud Function执行成功有确认Pub/Sub消息的效果。 这很有帮助。我有云函数逻辑本地开发的特定订阅者声明。删除额外的设置确实有助于减少额外的呼叫。但是,我现在遇到的问题是不存在但应该存在的电话问题。至少这解决了一个问题。

以上是关于GCP 云功能未正确接收/确认 PubSub 消息的主要内容,如果未能解决你的问题,请参考以下文章

如何正确接收 Pubsub JSON 数据?

GCP Pubsub 主题持续时间中存在的消息数

GCP - 验证 PubSub 推送的云功能 https 端点的所有权

GCP PubSub Spring Boot 重复提取消息

Firebase:我可以在 Firebase 云功能中“不确认”一条 PubSub 消息吗?

如何在 Firebase Cloud Functions 中确认 PubSub 消息?