如何使用 celery-beat 启动任务?

Posted

技术标签:

【中文标题】如何使用 celery-beat 启动任务?【英文标题】:How can to start tasks with celery-beat? 【发布时间】:2020-02-27 16:47:39 【问题描述】:

为什么我不能运行周期性任务?

proj/settings.py

REDIS_HOST = 'localhost'
REDIS_PORT = '6379'
CELERY_BROKER_URL = 'redis://localhost:6379'
BROKER_URL = 'redis://' + REDIS_HOST + ':' + REDIS_PORT

CELERY_BEAT_SCHEDULE = 
    'task-first': 
        'task': 'app.tasks.one',
        'schedule': timedelta(seconds=1)
    ,
    'task-second': 
        'task': 'app.tasks.two',
        'schedule': crontab(minute=0, hour='*/3,10-19')
    

proj/celery.py

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')
app.config_from_object('django.conf:settings')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

proj/__init.py__

from .celery import app as celery_app

__all__ = ['celery_app']

celery -A proj worker -l info

[2019-10-31 16:57:57,906: INFO/MainProcess] Connected to redis://localhost:6379//
[2019-10-31 16:57:57,912: INFO/MainProcess] mingle: searching for neighbors
[2019-10-31 16:57:58,927: INFO/MainProcess] mingle: all alone
[2019-10-31 16:57:58,951: INFO/MainProcess] celery@lexvel-MS-7A72 ready.

找到任务

celery -A proj beat -l info

Configuration ->
    . broker -> redis://localhost:6379//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[2019-10-31 16:58:02,851: INFO/MainProcess] beat: Starting...

celerybeat-shedule 文件已创建。但除了这些行之外,什么也没有显示。

任务

@task()
def one():
    print('start 1', datetime.now())
    driver = init_driver()
    parse(driver)
    driver.close()
    driver.quit()
​
​
@task()
def two():
    print('start 2', datetime.now())
    driver = init_driver()
    parse2(driver)
    driver.close()
    driver.quit()
    print('end 2', datetime.now())

【问题讨论】:

【参考方案1】:

芹菜节拍命令

celery -A proj worker -l info -B --scheduler django_celery_beat.schedulers:DatabaseScheduler

此命令用于启动 celery beat。

首先在设置文件的已安装应用中添加django_celery_beat模块。

然后应用 django migrate 命令,这将在管理面板中创建表格。

完成 celery 文件中的所有过程后,并在 tasks.py 中创建任务。

您将应用上面提到的 beat 命令。


proj/settings.py

INSTALLED_APPS = [

    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',

    'django_celery_beat',
]

REDIS_URL = "redis://localhost:6379/1"

CELERY_BROKER_URL=REDIS_URL

CELERY_RESULT_BACKEND=REDIS_URL

CELERY_ACCEPT_CONTENT = ['application/json']

CELERY_RESULT_SERIALIZER = 'json'

CELERY_TASK_SERIALIZER = 'json'

CELERY_BEAT_SCHEDULE = 

        'task-first': 
        'task': 'app.tasks.one',
        'schedule': timedelta(seconds=1)
       ,
      'task-second': 
        'task': 'app.tasks.two',
        'schedule': crontab(minute=0, hour='*/3,10-19')
      

proj/celery.py

from celery import Celery

from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

app.config_from_object('django.conf:settings')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

proj/__init.py__

from .celery import app as celery_app

**__all__** = ['celery_app']

【讨论】:

如果我不写这行 BROKER_URL = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0' 我得到错误 (403) ACCESS_REFUSED - Login was refused using authentication mechanism AMQPLAIN. For details see the broker logfile. 这令人困惑...您是使用 Redis 作为代理还是 RabbitMQ ??? 这里我使用了redis作为代理,首先你需要安装redis-server,然后你就可以轻松使用redis了。但是你可以使用 RabbitMq 作为代理。 @DejanLekic 我使用redis 作为代理,我不明白如果我删除BROKER_URL = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0',为什么会出现与RabbitMQ 相关的错误 启动beat的命令也会启动一个worker。我建议你不要这样做,除非你在开发环境中。【参考方案2】:

你运行你的worker时,有没有说app.tasks.oneapp.tasks.two是注册任务?如果它们没有作为注册任务出现在那里,那么您的节拍是安排无法执行的任务。 - 他们只会在队列中等待,并最终过期。检查您是否已注册它们的另一种方法是通过celery -A proj.celeryapp inspect registered(将proj.celeryapp 更改为您的Celery 应用程序的位置)。

【讨论】:

当我运行worker 时,我会得到我的任务列表[tasks] (app.tasks.one; app.tasks.two) 如果我写 celery -A proj.celery inspect registered 我得到错误 Error: No nodes replied within time constraint. 并且工作人员崩溃并出现错误 Cannot route message for exchange 'reply.celery.pidbox': Table empty or key no longer exists. Probably the key ('_kombu.binding.reply.celery.pidbox') has been removed from the Redis database. 您是否在至少一名工作人员正在运行时收到此错误? 我将kombu 降级到4.6.3 版本并运行此命令celery -A proj.celery inspect registered。在输出我得到-> celery@user: OK ( * app.tasks.one; * app.tasks.two)

以上是关于如何使用 celery-beat 启动任务?的主要内容,如果未能解决你的问题,请参考以下文章

Django celery 和 celery-beat 守护进程脚本错误

Celery-Beat:ACCESS_REFUSED - 使用身份验证机制 AMQPLAIN 拒绝登录

SBT - 如何使 InputTask 依赖于一个任务并最终完成另一个

如何使Delphi做的程序不显示在任务栏中

如何使 vscode 不等待完成 preLaunchTask?

如何使参数可用于所有 Luigi 任务?