从多处理开始芹菜工人

Posted

技术标签:

【中文标题】从多处理开始芹菜工人【英文标题】:Starting celery worker from multiprocessing 【发布时间】:2015-07-04 15:34:27 【问题描述】:

我是芹菜新手。我见过的所有示例都从命令行启动 celery worker。例如:

$ celery -A proj worker -l info

我正在启动一个关于弹性 beanstalk 的项目,并认为让 worker 成为我的 web 应用程序的子进程会很好。我尝试使用多处理,它似乎工作。我想知道这是否是一个好主意,或者是否有一些缺点。

import celery
import multiprocessing


class WorkerProcess(multiprocessing.Process):
    def __init__(self):
        super().__init__(name='celery_worker_process')

    def run(self):
        argv = [
            'worker',
            '--loglevel=WARNING',
            '--hostname=local',
        ]
        app.worker_main(argv)


def start_celery():
    global worker_process
    worker_process = WorkerProcess()
    worker_process.start()


def stop_celery():
    global worker_process
    if worker_process:
        worker_process.terminate()
        worker_process = None


worker_name = 'celery@local'
worker_process = None

app = celery.Celery()
app.config_from_object('celery_app.celeryconfig')

【问题讨论】:

有趣的是,这段代码正在使用 same Celery 实例,用于应用程序和工作程序。否则创建一个工人,例如命令行,似乎总是创建一个新的 Celery 实例。我不知道这是否是一个问题...... 【参考方案1】:

似乎是一个不错的选择,绝对不是唯一的选择,而是一个不错的选择:)

您可能想要研究的一件事(您可能已经在这样做)是将自动缩放与您的 Celery 队列的大小相关联。因此,您只有在队列增长时才扩大规模。

Effectively Celery 当然在内部做了类似的事情,所以没有太大区别。我能想到的唯一障碍是处理外部资源(例如数据库连接),这可能是个问题,但完全取决于您使用 Celery 所做的事情。

【讨论】:

【参考方案2】:

如果有人感兴趣,我确实可以在 Elastic Beanstalk 上使用运行 Python 3.4 的预配置 AMI 服务器来完成这项工作。我在运行 Debian Jessie 的基于 Docker 的服务器上遇到了很多问题。也许与端口重新映射有关。 Docker 有点像一个黑盒子,我发现它很难使用和调试。幸运的是,AWS 的好人刚刚在 2015 年 4 月 8 日添加了一个非 docker Python 3.4 选项。

我进行了很多搜索以使其部署和工作。我看到很多问题没有答案。所以这是我部署的非常简单的 python 3.4/flask/celery 进程。

Celery 你可以直接 pip 安装。您需要使用配置命令或 container_command 从配置文件安装 rabbitmq。我在上传的项目 zip 中使用了一个脚本,因此需要一个 container_command 才能使用该脚本(常规的 eb config 命令在项目安装之前执行)。

[yourapproot]/.ebextensions/05_install_rabbitmq.config:

container_commands:
  01RunScript:
    command: bash ./init_scripts/app_setup.sh

[yourapproot]/init_scripts/app_setup.sh:

#!/usr/bin/env bash

# Download and install Erlang
yum install erlang

# Download the latest RabbitMQ package using wget:
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.1/rabbitmq-server-3.5.1-1.noarch.rpm

# Install rabbit
rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
yum -y install rabbitmq-server-3.5.1-1.noarch.rpm

# Start server
/sbin/service rabbitmq-server start

我正在做一个烧瓶应用程序,所以我在第一个请求之前启动了工作人员:

@app.before_first_request
def before_first_request():
    task_mgr.start_celery()

task_mgr 创建 celery app 对象(我称之为 celery,因为烧瓶 app 对象是 app)。 -Ofair 在这里非常关键,对于一个简单的任务管理器。任务预取有各种奇怪的行为。这应该是默认的吧?

task_mgr/task_mgr.py:

import celery as celery_module
import multiprocessing


class WorkerProcess(multiprocessing.Process):
    def __init__(self):
        super().__init__(name='celery_worker_process')

    def run(self):
        argv = [
            'worker',
            '--loglevel=WARNING',
            '--hostname=local',
            '-Ofair',
        ]
        celery.worker_main(argv)


def start_celery():
    global worker_process
    multiprocessing.set_start_method('fork')  # 'spawn' seems to work also
    worker_process = WorkerProcess()
    worker_process.start()


def stop_celery():
    global worker_process
    if worker_process:
        worker_process.terminate()
        worker_process = None


worker_name = 'celery@local'
worker_process = None

celery = celery_module.Celery()
celery.config_from_object('task_mgr.celery_config')

到目前为止,我的配置非常简单:

task_mgr/celery_config.py:

BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'

CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'  # 'pickle' warning: can't use datetime in json
CELERY_RESULT_SERIALIZER = 'json'  # 'pickle' warning: can't use datetime in json
CELERY_TASK_RESULT_EXPIRES = 18000  # Results hang around for 5 hours

CELERYD_CONCURRENCY = 4

然后你可以把任务放在你需要的地方:

from task_mgr.task_mgr import celery
import time


@celery.task(bind=True)
def error_task(self):
    self.update_state(state='RUNNING')
    time.sleep(10)
    raise KeyError('im an error')


@celery.task(bind=True)
def long_task(self):
    self.update_state(state='RUNNING')
    time.sleep(20)
    return 'long task finished'


@celery.task(bind=True)
def task_with_status(self, wait):
    self.update_state(state='RUNNING')
    for i in range(5):
        time.sleep(wait)
        self.update_state(
            state='PROGRESS',
            meta=
                'current': i + 1,
                'total': 5,
                'status': 'progress',
                'host': self.request.hostname,
            
        )
    time.sleep(wait)
    return 'finished with wait = ' + str(wait)

我还保留了一个任务队列来保存异步结果,以便我可以监控任务:

task_queue = []


def queue_task(task, *args):
    async_result = task.apply_async(args)
    task_queue.append(
        
            'task_name':task.__name__,
            'task_args':args,
            'async_result':async_result
        
    )
    return async_result


def get_tasks_info():
    tasks = []

    for task in task_queue:
        task_name = task['task_name']
        task_args = task['task_args']
        async_result = task['async_result']
        task_id = async_result.id
        task_state = async_result.state
        task_result_info = async_result.info
        task_result = async_result.result
        tasks.append(
            
                'task_name': task_name,
                'task_args': task_args,
                'task_id': task_id,
                'task_state': task_state,
                'task_result.info': task_result_info,
                'task_result': task_result,
            
        )

    return tasks

当然,在你需要的地方开始任务:

from webapp.app import app
from flask import url_for, render_template, redirect
from webapp import tasks
from task_mgr import task_mgr


@app.route('/start_all_tasks')
def start_all_tasks():
    task_mgr.queue_task(tasks.long_task)
    task_mgr.queue_task(tasks.error_task)
    for i in range(1, 9):
        task_mgr.queue_task(tasks.task_with_status, i * 2)

    return redirect(url_for('task_status'))


@app.route('/task_status')
def task_status():
    current_tasks = task_mgr.get_tasks_info()
    return render_template(
        'parse/task_status.html',
        tasks=current_tasks
    )

就是这样。如果您需要任何帮助,请告诉我,尽管我对芹菜的了解仍然相当有限。

【讨论】:

以上是关于从多处理开始芹菜工人的主要内容,如果未能解决你的问题,请参考以下文章

芹菜工人并发

从芹菜任务中获取芹菜工人的名字?

VSCode调试芹菜工人

开始生产芹菜工人。在 Azure/linux 应用服务上使用 Django/Python

暂时禁用芹菜中的分布式处理

如何使用芹菜工人将 django 项目部署到谷歌云?