可以将 OrderedDict 作为 Celery 任务参数传递吗?
Posted
技术标签:
【中文标题】可以将 OrderedDict 作为 Celery 任务参数传递吗?【英文标题】:Is it ok to pass an OrderedDict as a Celery task argument? 【发布时间】:2019-04-10 05:51:27 【问题描述】:我在 Django REST 框架的序列化程序中重写了 update
方法。
在这个update
,由于用户可以发送很多孩子,我有一个异步芹菜任务process_children
,来处理孩子。
class MyModelSerializer(serializers.ModelSerializer):
....
@transaction.atomic
def update(self, mymodel, validated_data):
try:
children_data = validated_data.pop('children')
transaction.on_commit(lambda: process_children.apply_async(
countdown=1,
args=[mymodel.id, children_data]))
except KeyError:
pass
...
在 args 中,有一个参数不是json
对象而是OrderedDict
:children_data
。
任务如下:
@app.task
def process_children(mymodel_id, children_data):
mymodel = MyModel.objects.get(pk=mymodel_id)
children = mymodel.children.all()
for child_data in children_data:
try:
child = children.get(start=child_data['start'])
child = populate_child(child, child_data)
child.save()
except Child.DoesNotExist:
create_child(mymodel, child_data)
我读到我们应该只发送 json
(或 pickle、yaml 等)args。
但这个设置似乎有效
我什至可以发送 datetime
对象(即我在任务中使用的 start
属性,以将存储的子对象与通过 api 发送的新值进行匹配)。
那么这里发生了什么?
一切正常吗,celery 像老大一样序列化和反序列化 OrderedDict。 还是我疯了,应该在调用任务之前序列化并在任务内部反序列化?[更新,添加 CELERY 设置]
CELERY_BROKER_URL = get_env_variable('REDIS_URL')
CELERY_BROKER_POOL_LIMIT = 0
CELERY_REDIS_MAX_CONNECTIONS = 10
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/London'
【问题讨论】:
您是否尝试过dict(children_data)
,以便在发送到任务之前将其转换为普通字典?
使用dict
会比使用OrderedDict
更好吗?现在有什么意义,一切似乎都在测试和生产中工作?我的问题更多是正确还是我应该在json
中对自己进行序列化和反序列化。
【参考方案1】:
您正在使用 pickle 序列化程序,它可以相对较好地处理对象,但存在一些问题。 Here's a blog post on the concept of serializing and celery.
【讨论】:
抱歉,我忘了显示我的 CELERY 设置,我会补充一下,但我使用的是json
,并且从未收到该博客文章中解释的运行时错误。
很奇怪。有序的 dicts 可能会被很好地序列化,但日期时间应该会中断。
是的,我也有十进制字段:/
在Kombu doc 中明确表示不接受Date
和Decimal
。我会仔细检查是否没有使用泡菜。【参考方案2】:
是的,你做得对。
如doc 中所述。
客户端和worker之间传输的数据需要序列化,因此Celery中的每条消息都有一个content_type头,描述了用于对其进行编码的序列化方法。
此外,从 celery 4.0 开始,默认序列化程序是 JSON(之前是 pickle
)。因此,每当您调用此任务时,celery 默认会对其进行序列化和反序列化。
如果您想使用任何其他序列化程序,那么在调用任务时您需要指定content-type
(如果您使用的是.delay
,那么默认情况下序列化程序将是json
。
process_children.apply_async((model_id, children_data), serializer='pickle')
【讨论】:
正确,只要 int / etc 键未在 dict 中使用。 json 的序列化将它们强制转换为字符串,因为 json 只有字符串作为键,这在第一次发生时可能会非常震惊。以上是关于可以将 OrderedDict 作为 Celery 任务参数传递吗?的主要内容,如果未能解决你的问题,请参考以下文章
将 json 作为 celery 任务的一部分插入 postgres
在 Elastic Beanstalk 上使用 Supervisor 和 Django 将 Celery 作为守护进程运行