架构建议 - 如何实现自动缩放的异步任务

Posted

技术标签:

【中文标题】架构建议 - 如何实现自动缩放的异步任务【英文标题】:Architecture suggestions - How to implement autoscaled async tasks 【发布时间】:2020-12-27 06:01:10 【问题描述】:

我们有一个大型应用程序,它使用 django 作为 ORM,使用 celery 作为任务运行基础架构。 我们运行由事件(用户驱动或自动)触发的复杂管道,如下所示:


def pipeline_a:
# all lines are synchronous, so second line must happen after first is finished successfully
first_res = a1()
all_results = in_parallel.do(a2, a3, a4)
a5(first_res, all_results)

我们希望在不同的机器上运行 a1, a2, ...(每个任务可能需要不同的资源),并且并行运行的管道数量总是在变化。 今天我们使用 celery,它非常方便地实现上述功能 - 但不适合自动缩放(我们将它破解为与 kubernetes 一起使用,但它没有原生支持)。

我主要想解决的问题是:

    如何仅在之前的所有步骤都完成后“运行下一个管道步骤”(我可能事先不知道将运行哪些步骤 - 这取决于之前步骤的结果,因此这些步骤本质上是动态的) 今天我们尝试使用 kubernetes (EKS) 来自动扩展一些任务(SQS 队列大小是 hpa 指标)。如何让 kubernetes 不尝试终止当前正在运行的任务,但如果有新任务到达队列,仍然“启动 pod”(许多任务需要大约半小时才能完成)

到目前为止,我的经验是解决 1,celery 是最方便的方法,但它与 2 发生冲突。那么如果没有 celery,你将如何解决 1,然后我如何利用 Kubernetes 来处理长时间运行的任务?

【问题讨论】:

【参考方案1】:

如果我正确理解您的问题,

您有一个最多可以运行 30 分钟的异步作业。 作业正在 K8s 上运行。 当前作业的输出可能决定下一个作业。 您可以使用 SQS。

您可以为每个任务维护队列。为每个队列实现一个消费者。使用 Django 首先将任务添加到“a1”。更新 db 中的作业状态。

当 a1 的消费者完成执行时,它会更新 db 中的状态并推送到正确的队列。让我们说'a3'。 'a3' 的消费者将读取任务。更新数据库。执行。将任务推送到正确的队列中。更新数据库。

如果您使用 SQS,则将无限任务存储在队列中。您将不得不根据 SQS 队列的大小增加使用者的数量。为此,您可以使用https://github.com/Wattpad/kube-sqs-autoscaler

【讨论】:

以上是关于架构建议 - 如何实现自动缩放的异步任务的主要内容,如果未能解决你的问题,请参考以下文章

如何实现异步执行任务

Celery+Rabbitmq实现异步任务

单线程实现了多任务异步协程

微服务架构中的任务调度:在 SpringBoot 框架中使用异步任务,定时任务和邮件任务

Spring Boot入门系列如何实现异步执行任务

flaskcelery+redis 实现定时任务和异步—— 时间篇