celery 任务无法使用 python 遍历 postgresql 数据库中的多行

Posted

技术标签:

【中文标题】celery 任务无法使用 python 遍历 postgresql 数据库中的多行【英文标题】:celery task unable to iterate over multiple rows from postgresql database with python 【发布时间】:2021-08-03 01:06:00 【问题描述】:

我正在使用databases python 包 (https://pypi.org/project/databases/) 来管理与我的 postgresql 数据库的连接

来自文档 (https://www.encode.io/databases/database_queries/#queries) 它说我可以使用

# Fetch multiple rows without loading them all into memory at once
query = notes.select()
async for row in database.iterate(query=query):
    ...

# Fetch multiple rows
query = notes.select()
rows = await database.fetch_all(query=query)

这是我尝试过的:

def check_all_orders():
    query = "SELECT * FROM orders WHERE shipped=True"
    return database.fetch_all(query)

...
...
...

@app.task
async def check_orders():

    query = await check_all_orders()
    
    today = datetime.utcnow()

    for q in query:
        if q.last_notification is not None:
            if (today - q.last_notification).total_seconds() < q.cooldown:
                continue

@app.task
async def check_orders():

    query = "SELECT * FROM orders WHERE shipped=True"

    today = datetime.utcnow()

    async for q in database.iterate(query=query):
        if q.last_notification is not None:
            if (today - q.last_notification).total_seconds() < q.cooldown:
                continue

我都使用过,但出现以下错误:

raise TypeError(f'Object of type o.class.name ' kombu.exceptions.EncodeError:协程类型的对象不是 JSON 可序列化的

以下完全错误

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py", line 472, in trace_task
    mark_as_done(
  File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py", line 154, in mark_as_done
    self.store_result(task_id, result, state, request=request)
  File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py", line 434, in store_result
    self._store_result(task_id, result, state, traceback,
  File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py", line 856, in _store_result
    self._set_with_state(self.get_key_for_task(task_id), self.encode(meta), state)
  File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py", line 324, in encode
    _, _, payload = self._encode(data)
  File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py", line 328, in _encode
    return dumps(data, serializer=self.serializer)
  File "/usr/local/lib/python3.9/site-packages/kombu/serialization.py", line 220, in dumps
    payload = encoder(data)
  File "/usr/local/lib/python3.9/contextlib.py", line 135, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python3.9/site-packages/kombu/serialization.py", line 53, in _reraise_errors
    reraise(wrapper, wrapper(exc), sys.exc_info()[2])
  File "/usr/local/lib/python3.9/site-packages/kombu/exceptions.py", line 21, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.9/site-packages/kombu/serialization.py", line 49, in _reraise_errors
    yield
  File "/usr/local/lib/python3.9/site-packages/kombu/serialization.py", line 220, in dumps
    payload = encoder(data)
  File "/usr/local/lib/python3.9/site-packages/kombu/utils/json.py", line 65, in dumps
    return _dumps(s, cls=cls or _default_encoder,
  File "/usr/local/lib/python3.9/json/__init__.py", line 234, in dumps
    return cls(
  File "/usr/local/lib/python3.9/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/local/lib/python3.9/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "/usr/local/lib/python3.9/site-packages/kombu/utils/json.py", line 55, in default
    return super().default(o)
  File "/usr/local/lib/python3.9/json/encoder.py", line 179, in default
    raise TypeError(f'Object of type o.__class__.__name__ '
kombu.exceptions.EncodeError: Object of type coroutine is not JSON serializable

【问题讨论】:

【参考方案1】:

Schema.jsonify 方法——如 Flask 中的 jsonify——返回一个 Response 对象,kombu 将无法序列化(IIRC kombu 默认序列化为 JSON)。您可能应该使用 dump 而不是 jsonify 来返回字典。

【讨论】:

这不是flask ..使用fastapi ......但是有没有一种通用的方法来解决这个与框架无关的问题?也没有使用jsonify,所以不知道你为什么说instead of using jsonify 我刚刚尝试json.dumps(query) 序列化查询结果,但仍然出现相同的错误...您介意分享如何使用上述代码中的 json 转储吗?

以上是关于celery 任务无法使用 python 遍历 postgresql 数据库中的多行的主要内容,如果未能解决你的问题,请参考以下文章

python测试开发django-161.Celery 定时任务保存到数据库 (djcelery)

Celery介绍

celery原理与组件

python任务调度模块celery

python:利用celery分布任务

python 关于celery的异步任务队列的基本使用(celery+redis)采用配置文件设置