如何在长时间运行的程序中立即确认 Pub/Sub 消息

Posted

技术标签:

【中文标题】如何在长时间运行的程序中立即确认 Pub/Sub 消息【英文标题】:How to immediately acknowledge a Pub/Sub message in a long-running program 【发布时间】:2019-12-17 18:16:22 【问题描述】:

我有一个运行 Python 3.7(标准环境)的 App Engine 服务,它是 Pub/Sub 主题的推送订阅者(因此由它触发)。这个程序可以运行很长时间。我需要能够立即确认消息,然后继续运行,这样服务就不会连续收到相同的消息。

我在这里看到了两种可能性:

    理想情况下,我希望能够确认消息,然后让程序运行它; 或者,鉴于服务是幂等的,我可以让服务被消息包围,忽略所有重复消息,然后在程序结束时确认一次。

我认为选项 1 似乎更有吸引力——但我不知道它在 Python 中是否可行。据我所知(如果我错了,请纠正我),消息通过最后返回的200 状态得到确认。

我已将基本框架放入下面的程序中。欢迎任何想法,谢谢!

@app.route('/_ah/push-handlers/receive_messages/', methods=['POST'])
def receive_messages_handler():
    if (request.args.get('token', '') != current_app.config['PUBSUB_VERIFICATION_TOKEN']):
        return 'Invalid request', 400

    envelope = json.loads(request.data.decode('utf-8'))
    payload = base64.b64decode(envelope['message']['data'])

    logging.info(f"Recieved message: payload")

    # Long-running program in here

    return 'OK', 200

【问题讨论】:

【参考方案1】:

我建议你看看cloud task。它专为长期运行而设计,带有重试策略

设计如下:

appengine 使用 pubsub 消息,创建任务并确认消息(返回 2xx) 云任务调用 appengine 新端点以实现长时间运行的进程。

【讨论】:

我喜欢这种方法。我不知道它是否真的解决了最初的问题,所以将来遇到这个问题的任何人都可能对此感到满意,也可能不满意,但因为它解决了我的具体情况,所以我选择它作为答案。谢谢!

以上是关于如何在长时间运行的程序中立即确认 Pub/Sub 消息的主要内容,如果未能解决你的问题,请参考以下文章

如何在长时间运行的服务器操作期间与用户交互(例如确认对话框)?

Firebase Cloud Functions 如何确认 Cloud pub/sub

即使在确认后,应用程序也接受来自 google Pub/Sub 的重复消息

如何通过 Google Cloud Monitoring JAVA 客户端库获取 Pub/Sub 订阅中未确认消息的数量

确认后 GCP 消息保留在 Pub/Sub 中

如何修改后台 Cloud Function 的 Google Cloud Pub/Sub 订阅确认截止日期