可以将 OrderedDict 作为 Celery 任务参数传递吗?

Posted

技术标签:

【中文标题】可以将 OrderedDict 作为 Celery 任务参数传递吗?【英文标题】:Is it ok to pass an OrderedDict as a Celery task argument? 【发布时间】:2019-04-10 05:51:27 【问题描述】:

我在 Django REST 框架的序列化程序中重写了 update 方法。 在这个update,由于用户可以发送很多孩子,我有一个异步芹菜任务process_children,来处理孩子。

class MyModelSerializer(serializers.ModelSerializer):
    ....

    @transaction.atomic
    def update(self, mymodel, validated_data):
        try:
            children_data = validated_data.pop('children')
            transaction.on_commit(lambda: process_children.apply_async(
                countdown=1,
                args=[mymodel.id, children_data]))
        except KeyError:
            pass
        ...

在 args 中,有一个参数不是json 对象而是OrderedDictchildren_data

任务如下:

@app.task
def process_children(mymodel_id, children_data):
    mymodel = MyModel.objects.get(pk=mymodel_id)
    children = mymodel.children.all()
    for child_data in children_data:
        try:
            child = children.get(start=child_data['start'])
            child = populate_child(child, child_data)
            child.save()
        except Child.DoesNotExist:
            create_child(mymodel, child_data)

我读到我们应该只发送 json(或 pickle、yaml 等)args。

但这个设置似乎有效 我什至可以发送 datetime 对象(即我在任务中使用的 start 属性,以将存储的子对象与通过 api 发送的新值进行匹配)。

那么这里发生了什么?

一切正常吗,celery 像老大一样序列化和反序列化 OrderedDict。 还是我疯了,应该在调用任务之前序列化并在任务内部反序列化?

[更新,添加 CELERY 设置]

CELERY_BROKER_URL = get_env_variable('REDIS_URL')
CELERY_BROKER_POOL_LIMIT = 0
CELERY_REDIS_MAX_CONNECTIONS = 10
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/London'

【问题讨论】:

您是否尝试过dict(children_data),以便在发送到任务之前将其转换为普通字典? 使用dict 会比使用OrderedDict 更好吗?现在有什么意义,一切似乎都在测试和生产中工作?我的问题更多是正确还是我应该在json 中对自己进行序列化和反序列化。 【参考方案1】:

您正在使用 pickle 序列化程序,它可以相对较好地处理对象,但存在一些问题。 Here's a blog post on the concept of serializing and celery.

【讨论】:

抱歉,我忘了显示我的 CELERY 设置,我会补充一下,但我使用的是json,并且从未收到该博客文章中解释的运行时错误。 很奇怪。有序的 dicts 可能会被很好地序列化,但日期时间应该会中断。 是的,我也有十进制字段:/ 在Kombu doc 中明确表示不接受DateDecimal。我会仔细检查是否没有使用泡菜。【参考方案2】:

是的,你做得对。

如doc 中所述。

客户端和worker之间传输的数据需要序列化,因此Celery中的每条消息都有一个content_type头,描述了用于对其进行编码的序列化方法。

此外,从 celery 4.0 开始,默认序列化程序是 JSON(之前是 pickle)。因此,每当您调用此任务时,celery 默认会对其进行序列化和反序列化。 如果您想使用任何其他序列化程序,那么在调用任务时您需要指定content-type(如果您使用的是.delay,那么默认情况下序列化程序将是json

process_children.apply_async((model_id, children_data), serializer='pickle')

【讨论】:

正确,只要 int / etc 键未在 dict 中使用。 json 的序列化将它们强制转换为字符串,因为 json 只有字符串作为键,这在第一次发生时可能会非常震惊。

以上是关于可以将 OrderedDict 作为 Celery 任务参数传递吗?的主要内容,如果未能解决你的问题,请参考以下文章

将异步协程作为 celery 任务运行

将 json 作为 celery 任务的一部分插入 postgres

为啥 Celery 守护进程看不到任务?

在 Elastic Beanstalk 上使用 Supervisor 和 Django 将 Celery 作为守护进程运行

使用本地计算机作为主机将 EC2 实例设置为 Celery Workers

使用类方法作为 celery 任务