Celery Worker 中的多线程

Posted

技术标签:

【中文标题】Celery Worker 中的多线程【英文标题】:Multithreading within a Celery Worker 【发布时间】:2017-04-02 15:21:54 【问题描述】:

我正在使用 Celery 和 RabbitMQ 来处理来自 API 请求的数据。流程如下:

请求 > API > RabbitMQ > Celery Worker > 返回

理想情况下,我会产生更多的芹菜工人,但我受到内存限制。

目前,我的流程中的瓶颈是从传递给 worker 的 URL 中获取和下载数据。粗略,流程是这样的:

def celery_gets_job(url):
    data = fetches_url(url)       # takes 0.1s to 1.0s (bottleneck)
    result = processes_data(data) # takes 0.1s
    return result

这是不可接受的,因为工作人员在获取 URL 时被锁定了一段时间。我正在考虑通过线程来改进这一点,但我不确定最佳实践是什么。

有没有办法让 celery worker 异步下载传入的数据,同时在不同的线程中处理数据?

我是否应该让单独的工作人员获取和处理,通过某种形式的消息传递,可能通过 RabbitMQ?

【问题讨论】:

你可以考虑通过创建两个多进程在 celery 任务中使用类似multiprocessing pipes 的东西。当然,您的多处理进程应该受到池的限制。如果我没记错的话,通过 rabbitmq/result 后端共享获取的 url 的大数据并不是一个好主意。 Celery 低级 api 也可以有一些类似的功能。 我不知道 RabbitMQ 但我认为多处理将比多线程更适合您,因为celery_gets_job 有多个非原子操作,这会在使用多线程时产生问题。您可以使用队列,其中数据由运行fetches_url(url) 的进程池填充,另一个进程执行processes_data(data) 这可能就是你要找的东西:***.com/questions/28315657/… Celery 的创建者news.ycombinator.com/item?id=11889549 的这篇文章可能就是你要找的。​​span> 【参考方案1】:

使用eventlet 库,您可以修补标准库以使其异步。

首先导入异步urllib2:

from eventlet.green import urllib2

所以你会得到 url 正文:

def fetch(url):
    body = urllib2.urlopen(url).read()
    return body

查看更多eventlet 示例here。

【讨论】:

另外,直接使用 eventlet 执行池 docs.celeryproject.org/en/latest/userguide/concurrency/… 应该会自动修补 io 调用。 但是processes_data(data) 不会仍然阻塞并使组合结果比以前慢吗?【参考方案2】:

我将创建两个任务,一个用于下载数据,另一个用于在下载数据后对其进行处理。这样您就可以独立扩展这两个任务。请参阅:Routing、Chains。

【讨论】:

看起来不像一个解决方案。工作人员仍然会卡住等待 io 完成。目标是让 1 名工作人员一次下载多个网址。

以上是关于Celery Worker 中的多线程的主要内容,如果未能解决你的问题,请参考以下文章

Web Worker——js的多线程,实现统计博客园总阅读量

JavaScript 中的多线程通信的方法

NodeJs多线程、多进程、定时任务

nodejs多线程

多线程的设计模式:FutureMaster-Worker

Celery分布式队列学习