如何在 Django 中使用 Celery 上传和处理大型 excel 文件?

Posted

技术标签:

【中文标题】如何在 Django 中使用 Celery 上传和处理大型 excel 文件?【英文标题】:How to upload and process large excel files using Celery in Django? 【发布时间】:2021-10-21 00:08:07 【问题描述】:

我正在尝试使用带有 Celery 的 Django 和 DRF 上传和处理 excel 文件。 当我尝试将文件传递给我的 Celery 任务以在后台处理时出现问题,我收到以下错误:

kombu.exceptions.EncodeError: Object of type InMemoryUploadedFile is not JSON serializable

这是我的视图发布请求处理程序:

class FileUploadView(generics.CreateAPIView):
    """
    POST: upload file to save data in the database
    """
    parser_classes = [MultiPartParser]
    serializer_class = FileSerializerXLSX

    def post(self, request, format=None):
        """
        Allows to upload file and lets it be handled by pandas
        """

        serialized = FileSerializerXLSX(data=request.data)
        if serialized.is_valid():
            file_obj = request.data['file']
            # file_bytes = file_obj.read()
            print(file_obj)
            import_excel_task.delay(file_obj)
            print("its working")
            return Response(status=204)

        return Response(serialized._errors, status=status.HTTP_400_BAD_REQUEST)

还有我的芹菜任务:

def import_excel_helper(file_obj):
    df = extract_excel_to_dataframe(file_obj)
    transform_df_to_clientmodel(df)
    transform_df_to_productmodel(df)
    transform_df_to_salesmodel(df)


@shared_task(name="import_excel_task")
def import_excel_task(file_obj):
    """Save excel file in the background"""
    logger.info("Importing excel file")
    import_excel_helper(file_obj)

知道如何处理将 Excel 文件导入 celery 任务以便后台其他函数处理它吗?

【问题讨论】:

设置CELERY_TASK_SERIALIZER(和CELERY_ACCEPT_CONTENT)可能有助于解决这个问题。 @JPG 该文件是 excel 文件,因此在将其加载到数据框之前,我不知道如何将其转换为 json。 【参考方案1】:

与错误一样,调用 celery 任务的请求正文必须是 JSON 可序列化的,因为它是 default configuration。然后如kombu 中所述:

JSON 的主要缺点是它限制您使用以下数据类型:字符串、Unicode、浮点数、布尔值、字典和列表。小数和日期明显丢失。

假设这是我的 excel 文件。

file.xlsx

Some Value
Here :)

解决方案 1

在调用任务之前将 excel 的原始字节转换为 Base64 字符串,以便它可以进行 JSON 序列化(由于字符串是 JSON 文档中的有效数据类型,原始字节不是)。然后,Celery 配置中的其他所有内容都是相同的默认值。

tasks.py

import base64

import pandas

from celery import Celery

app = Celery('tasks')


@app.task
def add(excel_file_base64):
    excel_file = base64.b64decode(excel_file_base64)
    df = pandas.read_excel(excel_file)
    print("Contents of excel file:", df)

views.py

import base64

from tasks import add


with open("file.xlsx", 'rb') as file:  # Change this to be your <request.data['file']>
    excel_raw_bytes = file.read()
    excel_base64 = base64.b64encode(excel_raw_bytes).decode()
    add.apply_async((excel_base64,))

输出

[2021-08-19 20:40:28,904: INFO/MainProcess] Task tasks.add[d5373444-485d-4c50-8695-be2e68ef1c67] received
[2021-08-19 20:40:29,094: WARNING/ForkPoolWorker-4] Contents of excel file:
[2021-08-19 20:40:29,094: WARNING/ForkPoolWorker-4]  
[2021-08-19 20:40:29,099: WARNING/ForkPoolWorker-4]    Some Value
0  Here    :)
[2021-08-19 20:40:29,099: WARNING/ForkPoolWorker-4] 

[2021-08-19 20:40:29,099: INFO/ForkPoolWorker-4] Task tasks.add[d5373444-485d-4c50-8695-be2e68ef1c67] succeeded in 0.19386404199940444s: None

解决方案 2:

这是更难的方法。实现将处理 excel 文件的自定义序列化程序。

tasks.py

import ast
import base64

import pandas

from celery import Celery
from kombu.serialization import register


def my_custom_excel_encoder(obj):
    """Uncomment this block if you intend to pass it as a Base64 string:
    file_base64 = base64.b64encode(obj[0][0]).decode()
    obj = list(obj)
    obj[0] = [file_base64]
    """
    return str(obj)


def my_custom_excel_decoder(obj):
    obj = ast.literal_eval(obj)
    """Uncomment this block if you passed it as a Base64 string (as commented above in the encoder):
    obj[0][0] = base64.b64decode(obj[0][0])
    """
    return obj


register(
    'my_custom_excel',
    my_custom_excel_encoder,
    my_custom_excel_decoder,
    content_type='application/x-my-custom-excel',
    content_encoding='utf-8',
)


app = Celery('tasks')

app.conf.update(
    accept_content=['json', 'my_custom_excel'],
)


@app.task
def add(excel_file):
    df = pandas.read_excel(excel_file)
    print("Contents of excel file:", df)

views.py

from tasks import add


with open("file.xlsx", 'rb') as excel_file:  # Change this to be your <request.data['file']>
    excel_raw_bytes = excel_file.read()
    add.apply_async((excel_raw_bytes,), serializer='my_custom_excel')

输出

与解决方案 1 相同

解决方案 3

你可能对Sending raw data without Serialization的这个文档感兴趣

【讨论】:

以上是关于如何在 Django 中使用 Celery 上传和处理大型 excel 文件?的主要内容,如果未能解决你的问题,请参考以下文章

在 django 中使用 celery 和 ffmpeg 对视频进行转码

如何限制 django 网站的 redis/celery 任务?

如何在 Django 和 Celery 中配置多个代理?

如何在任务中获取芹菜结果模型(使用 django-celery-results)

Django - 如何在 celery 和 redis 中使用异步任务队列

如何使用 Django 配置 Celery 守护进程