将我的查询集 obj 作为参数传递时芹菜引发错误

Posted

技术标签:

【中文标题】将我的查询集 obj 作为参数传递时芹菜引发错误【英文标题】:Celery raise error while passing my queryset obj as parameter 【发布时间】:2016-04-18 08:15:15 【问题描述】:

我试图执行一个周期性任务,所以我使用 celery 和 Django 1.8 以及 Django Rest Framework 和 Postgres 作为数据库。当我尝试将我的 obj 发送到任务时,我得到TypeError: foreign_model_obj is not JSON serializable。如何将我的查询集对象传递给我的任务。

views.py:

class MyModelCreateApiView(generics.CreateAPIView):
    queryset = MyModel.objects.all()
    serializer_class = MyModelSerializer
    authentication_classes = (TokenAuthentication,)

    def create(self, request, *args, **kwargs):
        data = dict()
        data['foreign_model_id'] = kwargs['pk']
        foreign_model_obj = MyForeignModel.objects.get(id=data['foreign_model_id'])

        obj = MyModel.objects.create(**data)
        result = serialize_query(MyModel, "id": obj.id)
        local_time = foreign_model_obj.time
        my_celery_task.apply_async([foreign_model_obj], eta=local_time)
        return Response(result)

tasks.py:

@celery_app.task(name="my_celery_task")
def my_first_celery_task(mymodel_obj):
    # ... updating obj attributes
    mymodel_obj.save()

【问题讨论】:

【参考方案1】:

实际上,恕我直言,最好的方法是获取查询集的可提取组件,然后在任务中重新生成查询集 (https://docs.djangoproject.com/en/1.9/ref/models/querysets/):

import pickle
query = pickle.loads(s)     # Assuming 's' is the pickled string.
qs = MyModel.objects.filter(a__in=[1,2,3]) # whatever you want here...
querystr = pickle.dumps(qs.query)      # pickle the queryset
my_celery_task.apply_async(querystr, eta=local_time) # send only the string...

任务:

@celery_app.task(name="my_celery_task")
def my_celery_task(querystr):
    my_model_objs = MyModel.objects.all()
    my_model_objs.query = pickle.loads(querystr) # Restore the queryset
    # ... updating obj attributes
    item=my_model_objs[0]

我认为这是最好的方法,因为查询将在任务中执行(可能是第一次),防止各种时间问题,它不需要在调用者中执行(所以查询不会加倍) .

【讨论】:

这个可以吗?我的意思是,每次我尝试从 tasks.py 导入模型时,我都会得到一个 ImportError 从函数中导入。您可能让模型模块导入您的任务模块,而您的任务模块导入模型模块。那是行不通的。但是,在任务模块中将模型模块导入函数中 - 那时模型已经加载并且您不应该遇到问题。【参考方案2】:

您只需发送实例的id 并检索任务中的对象。 传递实例是一种不好的做法,因为它可以同时更改,特别是你正在执行你的任务,因为它似乎是延迟。

views.py:

class MyModelCreateApiView(generics.CreateAPIView):
    queryset = MyModel.objects.all()
    serializer_class = MyModelSerializer
    authentication_classes = (TokenAuthentication,)

    def create(self, request, *args, **kwargs):
        data = dict()
        data['foreign_model_id'] = kwargs['pk']
        foreign_model_obj = MyForeignModel.objects.get(id=data['foreign_model_id'])

        obj = MyModel.objects.create(**data)
        result = serialize_query(MyModel, "id": obj.id)
        local_time = foreign_model_obj.time
        my_celery_task.apply_async([foreign_model_obj.id], eta=local_time) # send only the obj id
        return Response(result)

tasks.py:

@celery_app.task(name="my_celery_task")
def my_celery_task(mymodel_obj_id):
    my_model_obj = MyModel.objects.get(id=mymodel_obj_id) # retrieve your object here
    # ... updating obj attributes
    mymodel_obj.save()

【讨论】:

非常适合我。我明白了,所以最好避免将查询集实例作为参数发送。【参考方案3】:

您可以将序列化方法更改为pickle,但不建议将queryset作为参数传递。引用 Celery 文档:

另一个问题是 Django 模型对象。它们不应该作为参数传递给任务。在任务运行时从数据库中重新获取对象几乎总是更好,因为使用旧数据可能会导致竞争条件。

http://docs.celeryproject.org/en/latest/userguide/tasks.html

【讨论】:

以上是关于将我的查询集 obj 作为参数传递时芹菜引发错误的主要内容,如果未能解决你的问题,请参考以下文章

Postman 400 错误请求

如何在类中将函数作为参数传递?

临时变量作为非const的引用进行参数传递引发的编译错误

Delphi 11:常量对象不能作为 var 参数传递

将参数从 s-s-rS 数据集传递到雪花

编写一个c#程序,引发并捕获ArgumentnNullException异常