使用 django、celery 和 redis 的一项一项任务
Posted
技术标签:
【中文标题】使用 django、celery 和 redis 的一项一项任务【英文标题】:One by one task using django, celery and redis 【发布时间】:2020-08-30 19:36:09 【问题描述】:我使用 django、celery 和 redis 异步启动任务。
# tasks.py
@shared_task
def my_task():
# Do stuff of task 1
return True
# Somewhere in test1.py
my_task.delay()
# Few milli seconds later in test2.py
my_task.delay()
使用该配置,my_task 在 2 个不同的文件上启动 2 次。所以它们几乎同时在不同的线程上执行。
我需要将这 2 个任务一一执行。如果 my_task #1 正在执行并且另一个 my_task #2 已启动,我需要 my_task #2 等待 #1 结束后再执行。
我不想只使用一个线程将参数传递给 celery celery worker --concurrency=1
我的 settings.py 中的 celery 配置是基本的:
# settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
我发现很多资源都在谈论这个主题,但我真的不明白如何实现我的目标
https://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#ensuring-a-task-is-only-executed-one-at-a-time Running "unique" tasks with celery http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html【问题讨论】:
【参考方案1】:http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html 上的解决方案几乎奏效了。这是一些改编:
import redis
REDIS_CLIENT = redis.Redis()
def only_one(function=None, key="", timeout=None):
"""Enforce only one celery task at a time."""
def _dec(run_func):
"""Decorator."""
def _caller(*args, **kwargs):
"""Caller."""
ret_value = None
have_lock = False
lock = REDIS_CLIENT.lock(key, timeout=timeout)
try:
have_lock = lock.acquire(blocking=True)
if have_lock:
ret_value = run_func(*args, **kwargs)
finally:
if have_lock:
lock.release()
return ret_value
return _caller
return _dec(function) if function is not None else _dec
@task(name='my_app.sample.tasks.single_task')
@only_one(key="SingleTask", timeout=60 * 5)
def single_task(self, **kwargs):
"""Run task."""
print("test")
问题是,我没有在我的 settings.py 中的任何地方配置 Redis,所以我不明白它是如何找到正确的 redis 数据库的。我猜它来自 celery 的配置。
【讨论】:
以上是关于使用 django、celery 和 redis 的一项一项任务的主要内容,如果未能解决你的问题,请参考以下文章
使用 django、celery 和 redis 的一项一项任务
Django - 如何在 celery 和 redis 中使用异步任务队列
django redis celery 和 celery beats 的正确设置
Celery 与 Django 中的 Redis 代理:任务成功执行,但仍有太多持久的 Redis 键和连接
在使用 django_celery_beat 设置的 Django 视图中使用 Celery 定期任务输出,并使用 Redis 设置缓存