如何更新 Celery Task ETA?

Posted

技术标签:

【中文标题】如何更新 Celery Task ETA?【英文标题】:How to update Celery Task ETA? 【发布时间】:2018-01-29 16:56:19 【问题描述】:

我正在使用 Celery 4.1.0 在 Django 1.10.3 中构建简单的等候名单应用程序。

我有以下基本任务:

@shared_task
def start_user_counter():
    logging.info('Task executed @ '.format(datetime.datetime.utcnow()))
    # This task is executed when user reaches the Top of the queue.
    # Send email, perform other stuff in here ...

@shared_task
def update_queue():
    curr_time = datetime.datetime.utcnow()
    logging.info('Task called @ '.format(curr_time))
    time_to_exec = curr_time + datetime.timedelta(seconds=10)
    # Here, perform checks if task already exists in Redis
    # if it does not exist - create a new one and store it to Redis
    # if it does exist - update task's ETA.
    task_id = start_user_counter.apply_async(eta=time_to_exec)
    logging.info('Task ID: '.format(task_id))
    # ...

update_queue.delay()

每个任务代表等待列表中的一个用户。当新用户被从等待列表中删除时(他到达 ETA 的顶部),他将被分配 ETA。但是,每个用户也有可能加快他到达等候名单顶部的时间。

问题:如何更新现有任务的 ETA,使其比最初预期的更早执行?

【问题讨论】:

如果队列中没有人,你知道这个任务对于单个用户需要多少时间吗? @ArpitSolanki 是的。添加用户时,默认 ETA 设置为 todays_date + 14 天。当用户加入队列(加快等待时间)时,我也会知道新的 ETA 将保持什么值。编辑:time_to_exec 只是此代码中的一个测试值。它显然会运行超过 10 秒。 【参考方案1】:

我已经设法解决了这个问题。我的解决方案是使用 Redis 创建sorted set。对于与该集合中的每个用户条目关联的score 值,我使用timestamp 表示用户被添加到等待列表中的时间。这有助于我将用户保留在正确订购者的等候名单中。

我还使用 Redis hash 来存储我在使用 notify_user.apply_async((self.id,), eta=eta).id 创建 celery 任务后立即收到的 celery.result.AsyncResult.id(见下文)。

然后,每当我需要更新任务的 ETA 时,我必须通过调用 AsyncResult.revoke() 像这样 AsyncResult(self.get_task_id()).revoke() 让工作人员忽略该任务。 AsyncResult(self.get_task_id()) 将返回与我通过调用 self.get_task_id() 获得的 id 关联的查询任务状态。在此 AsyncResult 实例上调用 .revoke() 将使任何接收任务或保留任务的工作人员忽略它。

这将允许我使用新的 ETA 创建全新的任务,我会将其 id 再次存储在 Redis 的同一用户记录中,从而覆盖旧的 id 值。

我的代码示例是针对我的情况,但底线是:

创建一个全新的任务并将其celery.result.AsyncResult.id 存储在某处(即self.task_id = T.apply_async((args,), eta=eta).id)。 如果您的新 ETA 取决于之前 ETA 的值,则将该值也存储在某处(即self.eta = eta) 使用AsyncResult(task_id) 创建查询任务状态的实例,并忽略此任务调用.revoke() 方法。 (即AsyncResult(self.task_id).revoke() 计算新的 ETA 并使用新的 ETA 创建新任务(即self.task_id = T.apply_async((args,), eta=new_eta).id
#utils.py
import datetime as dt
import redis
from django.conf import settings
from celery.result import AsyncResult
from .tasks import notify_candidate


KEY_DATA = 'user:data'
KEY_QUEUE = 'user:queue'
TIME_DELTA = 'time_delta'
TASK_ID = 'task_id'
WAITING_TIME = 14 * 24 * 60 * 60  # 14 days by default

r = redis.StrictRedis(host=settings.REDIS_HOST,
                      port=settings.REDIS_PORT,
                      db=settings.REDIS_DB)


class UserEntry(object):
    def __init__(self, user_id):
        self.id = user_id
        # dynamically creates string for each user that will be later used
        # as a key for hash in our Redis storage
        self.user_key = ':'.format(KEY_DATA, user_id)
        self.create_or_update()

    def create_or_update(self, data=None):
        """
        Set up new user entry.
        :return: None
        """
        if self.exist():
            # data exist for user with user_id - update it
            r.hmset(self.user_key, data)
        else:
            # this is a new user - create new entry for this user
            self.add_user()
            eta = dt.datetime.utcfromtimestamp(self.get_score())
            task_id = notify_user.apply_async((self.id,), eta=eta).id
            r.hmset(self.user_key, TASK_ID: task_id)

    def add_user(self):
        """
        Appends user's ID to the end of the queue.

        :return: None
        """
        if self.get_index():
            # if user entry exits simulate NX option of zadd command - 
            # Don't update already existing elements. Always add new elements.
            return

        # use UTC timestamp as score
        utc_time = dt.datetime.utcnow()
        score = int(utc_time.timestamp()) + WAITING_TIME

        r.zadd(KEY_QUEUE, score, self.id)

    def get_score(self):
        """
        Gets user's score (current ETA).

        :return: timestamp representing value of user's ETA
        """
        return r.zscore(KEY_QUEUE, self.id)

    def get_index(self):
        """
        Gets user's position in the queue.

        :return: 0-based index value representing user's position in the queue
        """
        return r.zrank(KEY_QUEUE, self.id)

    def get_task_id(self):
        """
        Helper method to get task ID for the user
        :return: value of user task's ID
        """
        return r.hget(self.user_key, TASK_ID).decode('ascii')

    def set_score(self, score_delta):
        """
        Move user up in the queue by score value.

        :param score_delta: number of seconds by which user's 
            score (curernt ETA) will be decremented
        :return: timestamp representing user's new score (ETA)
        """
        r.zincrby(KEY_QUEUE, self.id, score_delta)

    def exist(self):
        """
        Helper method used to define whether user exists in queue
        :return: dict of the hash’s name/value pairs if data entry exist
        """
        return r.hgetall(self.user_key)

    def bump(self):
        """
        Move user up in the queue
        :return: None
        """
        if not self.exist():
            return

        # remove current task associated with the user
        AsyncResult(self.get_task_id()).revoke()

        # we need to decrement ETA, thus *(-1)
        # here I make time_delta equal to 1 day or 1 * 24 * 60 * 60 seconds
        time_delta = WAITING_TIME / 14 * -1
        self.set_score(time_delta)
        new_eta = dt.datetime.utcfromtimestamp(time_delta)
        task_id = notify_user.apply_async((self.id,), eta=new_eta).id
        self.create_or_update(TASK_ID: task_id)

#tasks.py
import datetime
import logging

from celery import shared_task


@shared_task
def notify_user(user_id):
    logging.info('Task executed @ '.format(datetime.datetime.utcnow()))
    loging.info('UserID: '.format(user_id))
    # This task is executed when user reaches the Top of the queue.
    # Send email, perform other stuff in here ...


#models.py
from django.db.models.signals import post_save
from django.dispatch import receiver

from .utils import UserEntry


@receiver(post_save, sender=MyUser)
def create_user_entry_in_waiting_list(sender, instance=None, created=False, **kwargs):
    if created:
        # create user entry in the waiting_list
        user_id = instance.id
        UserEntry(user_id)

【讨论】:

以上是关于如何更新 Celery Task ETA?的主要内容,如果未能解决你的问题,请参考以下文章

芹菜:启动时启动任务

达到 eta 时,长 eta(8 小时以上)的 celery 任务会连续执行多次

2021-11-09

使用 Mandrill send_at 或 Celery countdown/eta 延迟发送电子邮件

当我将 Django Celery apply_async 与 eta 一起使用时,它会立即完成工作

celery使用group或者chord如何实时更新状态进度?