Django Celery IntegrityError
Posted
技术标签:
【中文标题】Django Celery IntegrityError【英文标题】: 【发布时间】:2021-12-25 11:18:13 【问题描述】:我想为我的项目创建一个进度条。我有一个班级,这个班级有一些功能。特别是其中一个需要很长时间(def download_all
),这是我想要创建进度条的主要原因。
我成功设置了 celery、celery-progress 等,它们都工作正常。我的问题是:我想将进度条集成到download_all
函数中。我
它给出一个错误:IntegrityError at /o.../k... NOT NULL 约束失败:django_celery_results_taskresult.task_id
我该如何解决? functions.py
class myClass():
def __init__(self, n_user, n_password, n_url, n_port, db_password, username):
...
self.download_all(n_user, n_password, n_url, n_port, db_password)
...
@shared_task(bind=True, name="my_add")
def download_all(n_user, n_password, n_url, n_port, db_password)
...
len_scans = len(scans)
progress_recorder = ProgressRecorder(self)
for s in scans:
i = 0
progress_recorder.set_progress(i + 1, len_scans)
i += 1
views.py
def setup_wizard(request):
...
functions.Zafiyet(setup.n_username, setup.n_password,
setup.n_url, setup.n_port, setup.db_password,
username=request.user.username)
追溯
Environment:
Request Method: POST
Request URL: http://127.0.0.1:8000/operasyonmerkezi/konfigurasyon
Django Version: 3.2.7
Python Version: 3.9.6
Installed Applications:
['django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'dashboard',
'accounts',
'logs',
'crispy_forms',
'django_apscheduler',
'easy_timezones',
'django_celery_results',
'celery_progress']
Installed Middleware:
['django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware']
Traceback (most recent call last):
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 581, in get_or_create
return self.get(**kwargs), False
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 435, in get
raise self.model.DoesNotExist(
During handling of the above exception (TaskResult matching query does not exist.), another exception occurred:
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
return self.cursor.execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\sqlite3\base.py", line 423, in execute
return Database.Cursor.execute(self, query, params)
The above exception (NOT NULL constraint failed: django_celery_results_taskresult.task_id) was the direct cause of the following exception:
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\core\handlers\exception.py", line 47, in inner
response = get_response(request)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\core\handlers\base.py", line 181, in _get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\contrib\auth\decorators.py", line 21, in _wrapped_view
return view_func(request, *args, **kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\dashboard\views.py", line 128, in setup_wizard
task = (functions.myClass(setup.n_username, setup.n_password,
File "C:\Users\edeni\Desktop\hawkdragon\dashboard\functions.py", line 44, in __init__
self.download_all(n_user, n_password, n_url, n_port, db_password)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\local.py", line 188, in __call__
return self._get_current_object()(*a, **kw)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\app\task.py", line 389, in __call__
return self.run(*args, **kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\dashboard\functions.py", line 153, in download_all
progress_recorder.set_progress(i + 1, len_scans)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery_progress\backend.py", line 46, in set_progress
self.task.update_state(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\app\task.py", line 971, in update_state
self.backend.store_result(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\celery\backends\base.py", line 482, in store_result
self._store_result(task_id, result, state, traceback,
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\backends\database.py", line 66, in _store_result
self.TaskModel._default_manager.store_result(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\managers.py", line 46, in _inner
return fun(*args, **kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django_celery_results\managers.py", line 168, in store_result
obj, created = self.using(using).get_or_create(task_id=task_id,
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 588, in get_or_create
return self.create(**params), True
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 453, in create
obj.save(force_insert=True, using=self.db)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 726, in save
self.save_base(using=using, force_insert=force_insert,
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 763, in save_base
updated = self._save_table(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 868, in _save_table
results = self._do_insert(cls._base_manager, using, fields, returning_fields, raw)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\base.py", line 906, in _do_insert
return manager._insert(
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\manager.py", line 85, in manager_method
return getattr(self.get_queryset(), name)(*args, **kwargs)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\query.py", line 1270, in _insert
return query.get_compiler(using=using).execute_sql(returning_fields)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\models\sql\compiler.py", line 1416, in execute_sql
cursor.execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 98, in execute
return super().execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 66, in execute
return self._execute_with_wrappers(sql, params, many=False, executor=self._execute)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 75, in _execute_with_wrappers
return executor(sql, params, many, context)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
return self.cursor.execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\utils.py", line 90, in __exit__
raise dj_exc_value.with_traceback(traceback) from exc_value
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\utils.py", line 84, in _execute
return self.cursor.execute(sql, params)
File "C:\Users\edeni\Desktop\hawkdragon\myvenv\lib\site-packages\django\db\backends\sqlite3\base.py", line 423, in execute
return Database.Cursor.execute(self, query, params)
Exception Type: IntegrityError at /operasyonmerkezi/konfigurasyon
Exception Value: NOT NULL constraint failed: django_celery_results_taskresult.task_id
【问题讨论】:
我没有使用过 celery_progress,但 Celery 任务继承自Task
— 你的 myClass
定义没有。 @shared_task(bind=True, ...
通常意味着任务实例将作为第一个参数传递给调用(作为 self)。我不知道 shared_task 装饰器首先会对类方法做什么——但你没有将self
参数传递给download_all
。我根本看不出您的代码作为书面功能是如何发挥作用的——如果它确实起作用,我建议您重写您的问题以包含一个最小工作的代码子集。
【参考方案1】:
首先,您可能不想在这样的方法上使用 @shared_task
装饰器。
因此,您的方法签名有点混乱。 bind=True
参数会导致 celery 将任务对象传递到调用中,这可能会根据您的签名产生意想不到的结果。
证明我的意思:
@shared_task(bind=True, name='mytask')
def my_task(the_task, a, b):
print(the_task) # a celery Task object
return a + b
# OR
@shared_task(name='mytask')
def my_task(a, b):
return a + b
请注意,这些是普通函数,而不是类中的方法。
现在,将其与类上的方法隐式接收类的实例self
作为第一个参数的事实相结合。您的签名中也缺少这一点。老实说,它完全按照编写的方式执行......根据您提供的代码,我预计它会失败并出现TypeError
。
相反,您应该将任务分解为它自己的函数,或者首先在方法上添加 @staticmethod
装饰器:
class Foo:
@shared_task(name='mytask')
@staticmethod
def bar(a, b):
...
回溯你的堆栈跟踪,你的错误来自任务结果表中的完整性错误:
异常值:NOT NULL 约束失败:django_celery_results_taskresult.task_id
发生这种情况是因为您的应用出于某种原因尝试重复使用任务 ID 并尝试存储已执行并存储在数据库中的结果。这可能是由上述问题、应用程序中的错误并发模型或导致意外重复的数据库/队列的带外更改引起的。您需要将有问题的任务从消息队列中清除或从数据库中删除有问题的结果,但除非您解决应用程序代码的潜在问题,否则可能无法保证这种情况不会再次发生。
【讨论】:
【参考方案2】:如果你想检查任务的进度,你有 3 个选项
-
将当前进度作为模型字段存储在数据库中
作为键值存储在redis db中
在运行时通过过滤和计数 db 中具有更改状态的元素进行计数
前端可以通过普通的 REST API 或者 websocket 得到这个结果。
REST API 的最低配置示例
模型.py
class MyLongProcess(models.Model):
active_uuid = models.UUIDField('Active process', null=True, blank=True)
name = models.CharField('Name', max_length=255)
current_step = models.IntegerField('Current step', default=0)
total = models.IntegerField('Total', default=0)
@property
def percentage_sending(self):
# or it can be computed by filtering elements processed in celery with complete status
return int((current_step / total) * 100)
views.py
def initiate_execute_long_task(request, name):
process = MyLongProcess.objects.create(active_uuid=uuid.uuid4(), name=name, total=BIG_COUNTED_VALUE)
async_execute_long_task.delay(process)
return HttpResponse()
from app.celery import app
@app.task
def async_execute_long_task(process)
for i in range process.total:
do_some_staff()
process.current_step += 1
process.save()
detail.html
<div class="progress-bar" role="progressbar"
style="width: my_model_object.percentage_sending %;"
aria-valuenow=" my_model_object.percentage_sending "
aria-valuemin="0" aria-valuemax="100"> my_model_object.percentage_sending %
</div>
【讨论】:
【参考方案3】:不要使用self.download_all(n_user, n_password, n_url, n_port, db_password)
但使用它self.download_all.delay(n_user, n_password, n_url, n_port, db_password)
在 celery 任务中运行函数
您可以使用websocket
并将进度数据发送到html,并使用javascript
更新进度条
每次进度更新进度条都会移动
【讨论】:
【参考方案4】:这似乎来自 Django 模型:“self.model.DoesNotExist”。
不是来自芹菜,来自Model.objects
。
你最好检查一下你的 ORM。
【讨论】:
以上是关于Django Celery IntegrityError的主要内容,如果未能解决你的问题,请参考以下文章
django+django-celery+celery的整合实战
django + celery - 如何在我的 django 应用程序中为 celery 设置 crontab 计划?