如何理解celery中的worker并发和多worker

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何理解celery中的worker并发和多worker相关的知识,希望对你有一定的参考价值。

参考技术A 我把在v2上发的帖子下面的评论摘抄下来
原帖: https://www.v2ex.com/t/593748#r_7793688

两个说的比较详细的回答:
回答1:
一个工厂(worker)有一个员工(单进(线)程),为了提高效率,请多了几个员工一起工作(单 worker 多进程)
为了防止这个工厂断电无法工作,那么多建了几个工厂,每个工厂有多个员工(多 worker 多进程)

实际上都是在实现并发

单 worker 可以开 n 个进程进行工作,一个 worker 挂了往往所有进程都会挂掉
多 worker 假设为 m 个,可以理解为分布式,为了防止一个 worker 挂了(或者性能不足等原因),导致无法工作
那么能够并发处理的任务数量理论上为 m * n

一般 worker(不仅 celery,很多设计都是这样)指一个调度主进程 + 多个子工作进程

一个 worker 有什么缺点:

比较常见的是在不同机器部署多个 worker
在不考虑机器和进程挂掉但情况,其实一个 worker 开 8 个进程和 2 个 worker 每个开 4 个进程的效率是接近的

回答2:
celery 里面的-c 参数指定的是并发度,而-P 参数指定并发的实现方式,有 prefork (default)、eventlet、gevent 等,prefork 就是多进程的方式去实现并发。
你理解的多 worker 对应到多个进程,每个 worker (进程)自己内部还能并发是 gunicorn 的方式。gunicorn 的-w 参数指定有几个 worker (即几个进程),-k 参数指定每个 worker 的并发方式,可以是多线程或者多协程,也可以指定为 sync,表示 worker 是同步的,即不能并发。
比如 gunicorn 的-w 10 -k sync 和 celery 的-c 10 -P prefork 是等价的,都是创建 10 个进程去做并发,并发度最高就是 10。

再例如 celery 的-c 10 -P gevent 表示创建 10 个 gevent 协程去做并发,最高并发度也是 10。而 gunicorn 的-w 10 -k gevent,表示的是创建 10 个进程,且每个进程都是 gevent 异步的,这个并发度就很高了。

如何在远程系统上配置和运行 celery worker

【中文标题】如何在远程系统上配置和运行 celery worker【英文标题】:how to configure and run celery worker on remote system 【发布时间】:2015-01-15 04:40:12 【问题描述】:

我正在研究 celery 并使用 rabbitmq 服务器,并在服务器中的 django 项目中创建了一个项目(其中存在消息队列、数据库),它工作正常,我也创建了多个工人

from kombu import Exchange, Queue
CELERY_CONCURRENCY = 8

CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

CELERY_RESULT_BACKEND = 'amqp'
CELERYD_HIJACK_ROOT_LOGGER = True
CELERY_HIJACK_ROOT_LOGGER = True
BROKER_URL = 'amqp://guest:guest@localhost:5672//'

CELERY_QUEUES = (
  Queue('default', Exchange('default'), routing_key='default'),
  Queue('q1', Exchange('A'), routing_key='routingKey1'),
  Queue('q2', Exchange('B'), routing_key='routingKey2'),
)
CELERY_ROUTES = 
 'my_taskA': 'queue': 'q1', 'routing_key': 'routingKey1',
 'my_taskB': 'queue': 'q2', 'routing_key': 'routingKey2',



AMQP_SERVER = "127.0.0.1"
AMQP_PORT = 5672
AMQP_USER = "guest"
AMQP_PASSWORD = "guest"
AMQP_VHOST = "/"`


CELERY_INCLUDE = ('functions')

`

但是我想从另一台服务器运行工作人员。所以当我提到几个站点时,我需要一些关于如何在另一个系统中运行工作人员的信息,它说我们需要在远程系统上运行 django 项目也是如此有必要吗?

【问题讨论】:

【参考方案1】:

首先,想想芹菜的真正作用是什么?

Celery 生产者将任务添加到队列中,其中包含名称和其他重要标题,以标识任务的位置。

Celery 没有向 MQ 添加完整的可执行函数。

所以,当您查看工人(消费者)方面。

Celery 从 MQ 获取任务详细信息并尝试运行它。 要运行这个任务,应该有可用的模块/文件/环境/代码库来执行这个任务。

现在让我们来回答你的问题...

您尝试将 worker 设置在单独的机器上,以便执行任务所指向的函数,您需要完整的任务代码环境,并且您应该连接(否则您将如何从 MQ 获取任务?)与任务所在的 MQ .

【讨论】:

【参考方案2】:

基本上我会接受 ChillarAnand 的回答。我想对他的回答添加评论,但我不能因为我没有 50 声望。

所以...

你的问题的答案...

首先您想阅读"how to send tasks to remote machine?",如 提到了ChillarAnand。

这真是一篇好文章,有一个小缺陷,比如“在函数 def add() 上没有 '@app.task',在内容 remote.py 中”,它引起了问题,让我感到困惑芹菜的新手。

“[Errno 113] No route to host.”部分的答案,

我猜...我猜你的 rabbitmq 服务器中运行了防火墙, 你可能想要一张支票。大多数时候,它是 iptables,但也可能是其他东西。关掉它,或者改变规则。那你可以再试一次。

【讨论】:

【参考方案3】:

您可以在您的 django 项目中使用app.send_task() 和以下内容:

from celery import Celery
import my_client_config_module

app = Celery()
app.config_from_object(my_client_config_module)

app.send_task('dotted.path.to.function.on.remote.server.relative.to.worker',
              args=(1, 2))

【讨论】:

如果是subtasks,如何使用send_taskapp.send_task('myapp.send_push_notification', (json.dumps(payload1), ), link=app.send_task('differentapp.save_pn_response', (json.dumps(payload2), ))) 侯赛因,你不能远程传输代码。您要运行的代码必须在您远程通话的 celery 实例上设置。您只需发送要运行的代码的名称和参数。由您决定数据如何传递(如果它不仅仅是一些简单的参数。我建议构建一个您的远程服务器可以查询的休息端点。或者可能在 AWS 上设置一些 lambdas 或其他东西。当它出现时没有捷径可走集群!【参考方案4】:

这里是这个想法的要点:

在机器 A 上:

    安装 Celery 和 RabbitMQ。 配置 rabbitmq 以便机器 B 可以连接到它。 创建包含一些任务的 my_tasks.py 并将一些任务放入队列中。

在机器 B 上:

    安装 Celery。 将 my_tasks.py 文件从机器 A 复制到这台机器。 运行一个工作线程来使用任务

我也有同样的要求,用芹菜做实验。这样做要容易得多。几天前我写了一篇详细的博客文章。查看how to send tasks to remote machine?

【讨论】:

我已经按照文档创建了如图所示的文件,但是在运行 celery worker 时出现以下错误------------- 消费者:无法连接到 amqp: //krish:**@123.456.78.9:5672/321.654.​​5.111: [Errno 113] 没有到主机的路由。 6.00 秒后重试... 看来rabbitmq有连接问题。尝试在具有相同配置的另一台机器上运行工作程序,看看它是否有效。 我在另一个系统上尝试过同样的方法,但它也引发了同样的问题 尝试使用新系统时出现另一个错误----无法连接到 amqp://krish:**@123.456.78.9:5672/321.654.​​5.111:timed out @sattva_venu 芹菜信号可以用于此。

以上是关于如何理解celery中的worker并发和多worker的主要内容,如果未能解决你的问题,请参考以下文章

Celery在Django中的使用介绍

如何优化 nginx 中的 worker_processes 和 worker_connections?

如何在远程系统上配置和运行 celery worker

1.10 Celery

celery相关问题

python celery多worker多队列定时任务