使用 gevent 执行池的 celery 任务的 SynchronousOnlyOperation
Posted
技术标签:
【中文标题】使用 gevent 执行池的 celery 任务的 SynchronousOnlyOperation【英文标题】:SynchronousOnlyOperation from celery task using gevent execution pool 【发布时间】:2020-12-16 16:06:46 【问题描述】:鉴于 celery 使用这些选项运行:
celery -A openwisp2 worker -l info --pool=gevent --concurrency=15 -Ofair
鉴于此celery task from openwisp-monitoring:
@shared_task
def perform_check(uuid):
"""
Retrieves check according to the passed UUID
and calls ``check.perform_check()``
"""
try:
check = get_check_model().objects.get(pk=uuid)
except ObjectDoesNotExist:
logger.warning(f'The check with uuid uuid has been deleted')
return
result = check.perform_check()
if settings.DEBUG: # pragma: nocover
print(json.dumps(result, indent=4, sort_keys=True))
大部分时间任务都有效,但有时(通常是突发)会产生以下异常:
SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
完整的堆栈跟踪:
SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
File "celery/app/trace.py", line 412, in trace_task
R = retval = fun(*args, **kwargs)
File "celery/app/trace.py", line 704, in __protected_call__
return self.run(*args, **kwargs)
File "openwisp_monitoring/check/tasks.py", line 44, in perform_check
check = get_check_model().objects.get(pk=uuid)
File "django/db/models/manager.py", line 85, in manager_method
return getattr(self.get_queryset(), name)(*args, **kwargs)
File "django/db/models/query.py", line 425, in get
num = len(clone)
File "django/db/models/query.py", line 269, in __len__
self._fetch_all()
File "django/db/models/query.py", line 1308, in _fetch_all
self._result_cache = list(self._iterable_class(self))
File "django/db/models/query.py", line 53, in __iter__
results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
File "django/db/models/sql/compiler.py", line 1154, in execute_sql
cursor = self.connection.cursor()
File "django/utils/asyncio.py", line 24, in inner
raise SynchronousOnlyOperation(message)
我不完全理解为什么会这样。
让我们回顾一下,如果我错了,请纠正我:
-
使用这种配置,celery 可以并行执行任务,这种并行化是由 gevent 使用 asyncio 事件循环执行的
gevent 然后使用同一个线程调用每个任务
这些任务并非设计为异步的,它们使用纯同步代码,这些任务执行数据库查询和网络请求
django 有一个
async_unsafe
装饰器,用它装饰数据库操作,这个装饰器检查事件循环是否正在运行,在这种情况下它会引发一个 SynchronousOnlyOperation
但为什么不是在 100% 的情况下提出这个例外,而只是在少数情况下提出呢?
这些任务确实在工作,我可以看到它,因为它们产生了正常显示的图表数据集合,或者它们产生了设备模型中的状态变化(例如:ok to critical)。
这是 OpenWISP Monitoring 中的错误、配置错误还是 Django 中的错误?
看起来 Django 中没有使用事件循环,但 Django 引发了这个异常,尽管它并不关心它。这可能是一个错误,但希望在提交错误报告之前听取专家对此主题的意见。
我认为一个可能的快速解决方案可能是设置环境变量DJANGO_ALLOW_ASYNC_UNSAFE
,但仅限于芹菜进程。
提前致谢。
【问题讨论】:
我敢打赌,这只是偶尔会花费太多时间,这就是在阻塞调用中使用await
的全部意义所在,db 调用就是其中之一。
【参考方案1】:
事实证明我的假设是错误的,我使用的代码在 gevent 中不是 greenlet 安全的。
由于暂时无法将代码重写为 greenlet 安全,因此我切换回 prefork。
【讨论】:
好吧,我正在尝试为 I/O 绑定任务配置gevent
。你能举一个“不是greenlet安全”的代码或情况的例子吗?谢谢。
@Paolo 这取决于您使用的库,在我的情况下,我使用的是 Django,它依赖于许多与 greenlet 不兼容的库。
我也在使用 Django,如何确定我的库是否与 greenlet 不兼容?
@bearcat,我上面给 Paolo 的回复同样适用于你。
@nemesisdesign 谢谢。我尝试了 gevent monkey 补丁,到目前为止它似乎对我有用。我可能没有使用我的 django 配置使用的所有库,所以可能仍然存在一些风险。我一直在寻找某种方法来确定一个库是否与在 django 配置中运行之外的 greenlet 兼容。以上是关于使用 gevent 执行池的 celery 任务的 SynchronousOnlyOperation的主要内容,如果未能解决你的问题,请参考以下文章
将 gevent.evnet 与 celery.task 一起使用
我应该在 celery 中使用 prefork、eventlet 或 gevent 哪个池类?