如何使用 Google Cloud PubSub 和 Run 处理资源密集型长时间运行的任务?

Posted

技术标签:

【中文标题】如何使用 Google Cloud PubSub 和 Run 处理资源密集型长时间运行的任务?【英文标题】:How to use Google Cloud PubSub and Run to handle resource-intensive long-running tasks? 【发布时间】:2019-12-13 08:35:32 【问题描述】:

我有一个 Google Cloud PubSub 主题,该主题有时有数千条消息,有时有零条消息。这些消息代表每个可能需要一个小时以上的任务。最好我能够为此使用 Cloud Run,因为它可以很好地满足需求,如果发布一千条消息,我希望启动 100 个 Cloud Run 实例。这些 Run 实例通过推送订阅启动。问题是 PubSub 有 600 秒的确认超时。这意味着为了让 Cloud Run 处理这些消息,它们必须在 600 秒内完成。如果他们不这样做,PubSub 会超时,然后再次发送,导致任务重新启动,直到第一个任务最终确认它(这会导致同一任务运行多次)。 Cloud Run 通过返回 2** HTTP 状态代码来确认消息。文档说明

当运行在 Cloud Run 上的应用处理完请求后,容器实例对 CPU 的访问将被禁用或受到严重限制。因此,您不应启动在请求处理程序范围之外运行的后台线程或例程。

那么是否可以通过代码确认 PubSub 请求并继续处理,而无需 Google Cloud Run 交出资源?还是有我不知道的更好的解决方案?

因为这些过程非常耗费代码/资源,我觉得 Cloud Functions 是不够的。我看过https://cloud.google.com/solutions/using-cloud-pub-sub-long-running-tasks 和https://cloud.google.com/blog/products/gcp/how-google-cloud-pubsub-supports-long-running-workloads。但是这些并没有回答我的问题。 我看过 Google Cloud Tasks,这可能是什么?但是项目的其余部分是围绕 PubSub/Run/Functions 构建的,所以我最好坚持下去。

这个项目是用 Python 编写的。 所以最好我想像这样编写我的 Google Cloud Run 任务:

@app.route('/', methods=['POST'])
def index():
    """Endpoint for Google Cloud PubSub messages"""
    pubsub_message = request.get_json()
    logger.info(f'Received PubSub pubsub_message pubsub_message')
    if message_incorrect(pubsub_message):
        return "Invalid request", 400 #use normal NACK handling
    # acknowledge message here without returning

    # ...
    # Do actual processing of the task here
    # ...

那么我该如何或应该如何解决这个问题,以便资源密集型任务能够按需适当扩展(因此推送 PubSub 订阅)。而且任务只执行一次。

答案: 简而言之,已经回答了什么。 Cloud Run 和 Functions 只是不适合这个问题。没有办法让他们分别完成超过 9 分钟或 15 分钟的任务。唯一的解决方案是切换到另一个 Google 服务并使用拉式订阅,而失去 GC 运行/功能的自动缩放

【问题讨论】:

【参考方案1】:

Cloud Functions 和 Cloud Run 都不足以满足任意长时间运行的操作。 Cloud Functions 有 hard cap of 9 minutes per invocation 和 Cloud Run caps at 60。如果您需要更多时间,您将不得不将工作委派给另一个产品,例如 Google Compute Engine。应该可以从其中一种无服务器产品中启动一些 Compute Engine 工作。

鉴于 pubsub 确认的限制,您可能必须找到一种方法让客户端能够轮询或收听某些资源,以了解工作何时实际完成。您可以为此使用数据库,而 Cloud Firestore 可让您收听文档以了解它们何时发生变化。因此,您可以使用它来跟踪您长期运行的工作的状态。

【讨论】:

另外值得注意的是,当在 Cloud Run 或 Cloud Functions 之外使用 Google Cloud Pub/Sub 时,如果进程仍在进行中,可以调用 modAckDeadline 来增加消息确认的最后期限。 Cloud Pub/Sub 客户端库会自动处理此问题。可以通过max_lease_duration property 控制调用 modAckDeadline 的最长时间。 自 2021 年 6 月 3 日起,最长 60 分钟的请求超时现已全面推出 (GA)。见release note。【参考方案2】:

GKE 上的 Cloud Run 可以处理比托管平台上可用的更长的进程、更多的 CPU 和内存。但是,您有一个始终在运行的 GKE 集群,并且您失去了“按使用付费”的好处。

如果您想使用此解决方案,请勿将 PubSub 推送订阅直接链接到您的 Cloud Run on GKE。为此,请使用 Cloud Task 和 HTTP 作业。超时时间比 PubSub 长(最长 24 小时而不是 10 分钟),并且重试策略是可定制的。

【讨论】:

以上是关于如何使用 Google Cloud PubSub 和 Run 处理资源密集型长时间运行的任务?的主要内容,如果未能解决你的问题,请参考以下文章

如何通过 terraform 使用服务帐户创建 google cloud pubsub pull 订阅?

google.cloud.pubsub_v1 和 google.cloud.pubsub 有啥区别?

Google Cloud PubSub:如何仅读取最新记录

如何在 Google Cloud App Engine 上使用 PubSub 创建订阅者,该订阅者通过 Publisher 从 Google Cloud App Engine Flex 收听消息?

为 Google PubSub 暂停 Spring Cloud StreamListener

获取 google cloud pubsub 的指标