GCP 云功能未正确接收/确认 PubSub 消息
Posted
技术标签:
【中文标题】GCP 云功能未正确接收/确认 PubSub 消息【英文标题】:GCP Cloud Function not correctly picking up/acknowledging PubSub messages 【发布时间】:2020-12-06 21:25:52 【问题描述】:我在 Google Cloud Platform 中设置了一些数据处理工作流程。这些位置处理物理地址并返回一些关于它们的指标。工作流使用 Cloud Functions 和 PubSub 流的组合。
在工作流中使用一个 Google Cloud 函数时,某些消息不会从触发流中提取或被多次提取。我知道这在一定程度上是可以预料的。但是,这种情况经常发生。这足以导致某些地点被夸大了 10 倍,而其他一些地点却没有结果。
我认为callback
函数没有正确确认消息,但我不确定应该有什么不同才能更可靠地接收和确认消息。任何建议表示赞赏。
用于检索指标的我的 GCP 云函数由 PubSub 流触发,并执行 retrieve_location
函数将数据发送到不同的 PubSub 流。 retrieve_location
函数如下所示:
def retrieve_location(event, context):
auth_flow()
project_id = <my project id>
subscription_name = <my subscription name>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
message.ack()
message_obj = message.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
subscriber.subscribe(subscription_path, callback=callback)
get_metrics
函数从每条消息中获取有效负载,检索一些数据并将其发送到另一个流。此功能似乎按预期工作。
def get_metrics(loc):
<... retrieve and process data, my_data is the object that gets sent to the next stream ...>
project_id = <my project id>
topic_name = <my topic name>
topic_id = <my topic id>
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
publisher.publish(topic_path, data=my_data.encode('utf-8'))
except Exception as exc:
print("topic publish failed: ", exc)
【问题讨论】:
那么您的 Cloud Function 是否由 Pub/Sub 消息触发,导致它在不同的 Pub/Sub 订阅上启动订阅者?以这种方式启动 Pub/Sub 订阅者有点不寻常。 @KamalAboul-Hosn 每个 PubSub 流的订阅和主题 ID 都是唯一的,因此不应该交叉。但是开始订阅的最佳方式是什么? 我同意这是一个不寻常的模式。您能概括地解释一下为什么需要这样做吗? @DustinIngram 从 PubSub 流中触发函数以便拾取和处理所有消息的最佳方法是什么?我正在处理每个地址并检索数据。 只需将函数订阅到主题就足够了。我仍然不清楚为什么您需要在函数中添加第二个订阅者? 【参考方案1】:您似乎将使用 Cloud Pub/Sub 触发 Cloud Function 与直接通过 Cloud Pub/Sub 客户端库使用 Pub/Sub 混为一谈。一般来说,你会想要做一个或另一个。
如果您创建的订阅是通过 Cloud Functions 完成的,那么您的 retrieve_location
函数并没有真正接收和处理消息。相反,它正在做的是启动一个订阅客户端,然后很快就会关闭,因为subscriber.subscribe
将运行到完成,因此您的函数将完成执行。
如果此函数正在启动一个触发 Cloud Function 的同一订阅的客户端,那么它实际上不会做任何事情,因为基于 Cloud-Function 的订阅使用 push 模型,而客户端库应该是与pull 模型一起使用。
您要么想直接在retrieve_location
中执行callback
中的操作,使用事件作为消息(如Dustin 所述),要么您希望使用客户端库设置一个持久订阅者,例如,在 GCE 上,实例化订阅者并在其上调用 subscribe
。
【讨论】:
这个细节也非常有助于解决问题。感谢您提供此见解!【参考方案2】:您应该创建一个background function 来订阅直接处理有效负载的主题,而不是在您的 Cloud Function 中设置第二个 Pub/Sub 订阅者,例如:
def get_metrics_background_function(event, context):
message_obj = event.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
【讨论】:
我正在努力做出您建议的更改。在此示例中,没有message.ack()
。不需要吗?
没必要,Cloud Function执行成功有确认Pub/Sub消息的效果。
这很有帮助。我有云函数逻辑本地开发的特定订阅者声明。删除额外的设置确实有助于减少额外的呼叫。但是,我现在遇到的问题是不存在但应该存在的电话问题。至少这解决了一个问题。以上是关于GCP 云功能未正确接收/确认 PubSub 消息的主要内容,如果未能解决你的问题,请参考以下文章
GCP - 验证 PubSub 推送的云功能 https 端点的所有权