Django Celery IntegrityError

Posted

技术标签:

【中文标题】Django Celery IntegrityError【英文标题】: 【发布时间】:2021-12-25 11:18:13 【问题描述】:

我想为我的项目创建一个进度条。我有一个班级,这个班级有一些功能。特别是其中一个需要很长时间(def download_all),这是我想要创建进度条的主要原因。

我成功设置了 celery、celery-progress 等,它们都工作正常。我的问题是:我想将进度条集成到download_all 函数中。我

它给出一个错误:IntegrityError at /o.../k... NOT NULL 约束失败:django_celery_results_taskresult.task_id

我该如何解决? functions.py

class myClass():

    def __init__(self, n_user, n_password, n_url, n_port, db_password, username):
        ...
        self.download_all(n_user, n_password, n_url, n_port, db_password)
        ...

    @shared_task(bind=True, name="my_add")
    def download_all(n_user, n_password, n_url, n_port, db_password)
     ...
     len_scans = len(scans)
     progress_recorder = ProgressRecorder(self)
     for s in scans:
         i = 0
         progress_recorder.set_progress(i + 1, len_scans)
         i += 1

views.py

def setup_wizard(request):
  ...
  functions.Zafiyet(setup.n_username, setup.n_password,
                                          setup.n_url, setup.n_port, setup.db_password,
                                          username=request.user.username)  

追溯

Environment:


Request Method: POST
Request URL: http://127.0.0.1:8000/operasyonmerkezi/konfigurasyon

Django Version: 3.2.7
Python Version: 3.9.6
Installed Applications:
['django.contrib.admin',
 'django.contrib.auth',
 'django.contrib.contenttypes',
 'django.contrib.sessions',
 'django.contrib.messages',
 'django.contrib.staticfiles',
 'dashboard',
 'accounts',
 'logs',
 'crispy_forms',
 'django_apscheduler',
 'easy_timezones',
 'django_celery_results',
 'celery_progress']
Installed Middleware:
['django.middleware.security.SecurityMiddleware',
 'django.contrib.sessions.middleware.SessionMiddleware',
 'django.middleware.common.CommonMiddleware',
 'django.middleware.csrf.CsrfViewMiddleware',
 'django.contrib.auth.middleware.AuthenticationMiddleware',
 'django.contrib.messages.middleware.MessageMiddleware',
 'django.middleware.clickjacking.XFrameOptionsMiddleware']



Traceback (most recent call last):
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 581, in get_or_create
    return self.get(**kwargs), False
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 435, in get
    raise self.model.DoesNotExist(

During handling of the above exception (TaskResult matching query does not exist.), another exception occurred:
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
    return self.cursor.execute(sql, params)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\sqlite3\base.py", line 423, in execute
    return Database.Cursor.execute(self, query, params)

The above exception (NOT NULL constraint failed: django_celery_results_taskresult.task_id) was the direct cause of the following exception:
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\core\handlers\exception.py", line 47, in inner
    response = get_response(request)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\core\handlers\base.py", line 181, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\contrib\auth\decorators.py", line 21, in _wrapped_view
    return view_func(request, *args, **kwargs)
  File "C:\Users\edeni\Desktop\hawkdragon\dashboard\views.py", line 128, in setup_wizard
    task = (functions.myClass(setup.n_username, setup.n_password,
  File "C:\Users\edeni\Desktop\hawkdragon\dashboard\functions.py", line 44, in __init__
    self.download_all(n_user, n_password, n_url, n_port, db_password)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\local.py", line 188, in __call__
    return self._get_current_object()(*a, **kw)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\app\task.py", line 389, in __call__
    return self.run(*args, **kwargs)
  File "C:\Users\edeni\Desktop\hawkdragon\dashboard\functions.py", line 153, in download_all
    progress_recorder.set_progress(i + 1, len_scans)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery_progress\backend.py", line 46, in set_progress
    self.task.update_state(
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\app\task.py", line 971, in update_state
    self.backend.store_result(
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\backends\base.py", line 482, in store_result
    self._store_result(task_id, result, state, traceback,
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\backends\database.py", line 66, in _store_result
    self.TaskModel._default_manager.store_result(
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\managers.py", line 46, in _inner
    return fun(*args, **kwargs)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\managers.py", line 168, in store_result
    obj, created = self.using(using).get_or_create(task_id=task_id,
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 588, in get_or_create
    return self.create(**params), True
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 453, in create
    obj.save(force_insert=True, using=self.db)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 726, in save
    self.save_base(using=using, force_insert=force_insert,
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 763, in save_base
    updated = self._save_table(
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 868, in _save_table
    results = self._do_insert(cls._base_manager, using, fields, returning_fields, raw)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 906, in _do_insert
    return manager._insert(
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\manager.py", line 85, in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 1270, in _insert
    return query.get_compiler(using=using).execute_sql(returning_fields)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\sql\compiler.py", line 1416, in execute_sql
    cursor.execute(sql, params)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 98, in execute
    return super().execute(sql, params)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 66, in execute
    return self._execute_with_wrappers(sql, params, many=False, executor=self._execute)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 75, in _execute_with_wrappers
    return executor(sql, params, many, context)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
    return self.cursor.execute(sql, params)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\utils.py", line 90, in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
    return self.cursor.execute(sql, params)
  File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\sqlite3\base.py", line 423, in execute
    return Database.Cursor.execute(self, query, params)

Exception Type: IntegrityError at /operasyonmerkezi/konfigurasyon
Exception Value: NOT NULL constraint failed: django_celery_results_taskresult.task_id

【问题讨论】:

我没有使用过 celery_progress,但 Celery 任务继承自 Task — 你的 myClass 定义没有@shared_task(bind=True, ... 通常意味着任务实例将作为第一个参数传递给调用(作为 self)。我不知道 shared_task 装饰器首先会对类方法做什么——但你没有将self 参数传递给download_all。我根本看不出您的代码作为书面功能是如何发挥作用的——如果它确实起作用,我建议您重写您的问题以包含一个最小工作的代码子集。 【参考方案1】:

首先,您可能不想在这样的方法上使用 @shared_task 装饰器。

因此,您的方法签名有点混乱。 bind=True 参数会导致 celery 将任务对象传递到调用中,这可能会根据您的签名产生意想不到的结果。

证明我的意思:

@shared_task(bind=True, name='mytask')
def my_task(the_task, a, b):
    print(the_task)  # a celery Task object
    return a + b
# OR
@shared_task(name='mytask')
def my_task(a, b):
    return a + b

请注意,这些是普通函数,而不是类中的方法。

现在,将其与类上的方法隐式接收类的实例self 作为第一个参数的事实相结合。您的签名中也缺少这一点。老实说,它完全按照编写的方式执行......根据您提供的代码,我预计它会失败并出现TypeError

相反,您应该将任务分解为它自己的函数,或者首先在方法上添加 @staticmethod 装饰器:

class Foo:
    @shared_task(name='mytask')
    @staticmethod
    def bar(a, b):
        ...

回溯你的堆栈跟踪,你的错误来自任务结果表中的完整性错误:

异常值:NOT NULL 约束失败:django_celery_results_taskresult.task_id

发生这种情况是因为您的应用出于某种原因尝试重复使用任务 ID 并尝试存储已执行并存储在数据库中的结果。这可能是由上述问题、应用程序中的错误并发模型或导致意外重复的数据库/队列的带外更改引起的。您需要将有问题的任务从消息队列中清除或从数据库中删除有问题的结果,但除非您解决应用程序代码的潜在问题,否则可能无法保证这种情况不会再次发生。

【讨论】:

【参考方案2】:

如果你想检查任务的进度,你有 3 个选项

    将当前进度作为模型字段存储在数据库中 作为键值存储在redis db中 在运行时通过过滤和计数 db 中具有更改状态的元素进行计数

前端可以通过普通的 REST API 或者 websocket 得到这个结果。

REST API 的最低配置示例

模型.py

class MyLongProcess(models.Model):
    active_uuid = models.UUIDField('Active process', null=True, blank=True)
    name = models.CharField('Name', max_length=255)
    current_step = models.IntegerField('Current step', default=0)
    total = models.IntegerField('Total', default=0)

    @property
    def percentage_sending(self):
        # or it can be computed by filtering elements processed in celery with complete status
        return int((current_step / total) * 100) 

views.py

def initiate_execute_long_task(request, name):
    process = MyLongProcess.objects.create(active_uuid=uuid.uuid4(), name=name, total=BIG_COUNTED_VALUE)
    async_execute_long_task.delay(process)
    return HttpResponse()


from app.celery import app
@app.task
def async_execute_long_task(process)
    for i in range process.total:
        do_some_staff()
        process.current_step += 1
        process.save()
               

detail.html

<div class="progress-bar" role="progressbar"
  style="width:  my_model_object.percentage_sending %;"
  aria-valuenow=" my_model_object.percentage_sending "
  aria-valuemin="0" aria-valuemax="100"> my_model_object.percentage_sending %
</div>

【讨论】:

【参考方案3】:

不要使用self.download_all(n_user, n_password, n_url, n_port, db_password)

但使用它self.download_all.delay(n_user, n_password, n_url, n_port, db_password) 在 celery 任务中运行函数

您可以使用websocket 并将进度数据发送到html,并使用javascript 更新进度条

每次进度更新进度条都会移动

【讨论】:

【参考方案4】:

这似乎来自 Django 模型:“self.model.DoesNotExist”。

不是来自芹菜,来自Model.objects

你最好检查一下你的 ORM。

【讨论】:

以上是关于Django Celery IntegrityError的主要内容,如果未能解决你的问题,请参考以下文章

django+django-celery+celery的整合实战

django + celery - 如何在我的 django 应用程序中为 celery 设置 crontab 计划?

django celery使用

django+celery实现异步任务

不同服务器上的 Django 和 celery,一旦任务完成,celery 能够向 django 发送回调

django+celery配置(定时任务)