芹菜工人并发

Posted

技术标签:

【中文标题】芹菜工人并发【英文标题】:Celery workers concurrency 【发布时间】:2022-01-08 05:20:30 【问题描述】:

我有一个 Python 脚本,它是一个 celery 任务。我的应用程序需要多个并发工作人员(来自同一个脚本),因为每个工作人员都需要按顺序处理消息。我不能使用参数 concurrency = n 因为这样消息不会按顺序处理,这是一种要求。

所以我所做的是在消息代理中设置“n”个不同的队列,然后启动“n”个不同的工作人员,每个工作人员分别处理每个队列,我在每个工作人员中设置了 concurrency = 1,所以每个队列按顺序处理。 工作人员来自同一个脚本,因此要启动这些工作人员,我必须复制脚本 n 次并在不同线程中并行运行所有工作人员。

在我看来,这不是实现此目标的最有效方法,因为代码被复制了多次。有没有其他方法可以做到这一点?

如果需要更多详细信息,请告诉我。

【问题讨论】:

“我不能使用参数 concurrency = n 因为这样消息没有按顺序处理,这是一种要求”部分非常令人困惑。顺序处理和并发处理是完全相反的方法。如果您需要并发处理但任务应该以先进先出的方式从队列中取出 - 这是完全可行的(通常通过配置您的队列)。 对不起,如果我不完全清楚,让我试着澄清一下。所以在每个队列中,消息都需要一个一个的处理,因为对当前消息需要做的操作依赖于前一个消息。因此,如果我启动了一个并发 = n 的工作线程,那么该工作线程将有 n 个进程,每个进程都会占用一条消息,并且无法识别每个进程的确切先前消息。 我想我开始了解您需要什么(任务之间存在依赖关系)。但是你能提供一些具体的例子吗?这些任务的性质是什么,为什么其中一些不能在其他任务之前执行? 刚刚意识到我犯的愚蠢错误。我可以使用相同的 python 脚本来实例化不同的工作人员,每个工作人员都在听不同的队列并使用不同的名称标签。不知道出于什么原因我认为每个脚本只能实例化一个工作人员。无论如何感谢您的帮助。 不客气。 :) 【参考方案1】:

map 和 starmap 是内置任务,它们为序列中的每个元素调用提供的调用任务。

例如,task.map([1, 2]) - 导致调用单个任务,将参数按顺序应用于任务函数,结果为: res = [任务(1),任务(2)] 执行任务(1),然后执行任务(2)。

https://docs.celeryproject.org/en/stable/userguide/canvas.html#map-starmap

如果任务(2)使用任务(1)的结果,应该使用链。 https://docs.celeryproject.org/en/stable/userguide/canvas.html#chains

【讨论】:

以上是关于芹菜工人并发的主要内容,如果未能解决你的问题,请参考以下文章

芹菜。运行单个芹菜节拍 + 多个芹菜工人规模

芹菜工人在 aws 弹性豆茎中失败 [退出:芹菜工人(退出状态 1;未预期)]

从芹菜任务中获取芹菜工人的名字?

芹菜多里面码头工人容器

如何使用芹菜工人将 django 项目部署到谷歌云?

码头工人群或牧场牛中的芹菜工人