Celery/redis 任务并不总是完成 - 不知道为啥或如何适应它

Posted

技术标签:

【中文标题】Celery/redis 任务并不总是完成 - 不知道为啥或如何适应它【英文标题】:Celery/redis tasks don't always complete - not sure why or how to fit itCelery/redis 任务并不总是完成 - 不知道为什么或如何适应它 【发布时间】:2022-01-01 13:59:42 【问题描述】:

我在 django v 3.0.1 应用程序(Python v 3.6.9)中运行 celery v 4.0.3/redis v 4.09。我还在芹菜任务find_faces 中使用face_recognition 在我上传到应用程序的图像中查找人脸,以及其他图像处理芹菜任务。处理五个或更少的图像文件没有问题,因为所有图像处理 celery 任务都成功完成。

当我有图像处理任务(包括find_faces)迭代超过 100 张图像时,有 10-30 张图像没有完成find_faces 任务。当我使用flower v0.9.7 查看 celery 任务时,我看到那些未完成的图像的find_faces 任务状态为“已启动”。所有其他图像的find_faces 任务状态为“成功”。这些“已启动”任务的状态永远不会改变,也不会报告错误或异常。然后,我可以在每个图像上单独运行图像处理任务,包括find_faces 任务,任务状态为“成功”。如果我将 celery 作为守护进程或本地运行,或者我使用 wsgi 和 apache 或 runserver 运行 django 应用程序,这些结果不会改变。 Flower 还报告说,我所有任务的重试次数 = 0。

我在 django 应用中全局设置了CELERYD_TASK_SOFT_TIME_LIMIT = 60,并为find_faces 任务设置了max_retries=5

@app.task(bind=True, max_retries=5)
def find_faces_task(self, document_id, use_cuda=settings.USE_CUDA):
    logger.debug("find_faces_task START")
    try:
        temp_face = None
        from memorabilia.models import TaskStatus, Document      
        args = "document_id=%s, use_cuda=%s" % (document_id, use_cuda)
        ts = TaskStatus(document_id_id=document_id, task_id=self.request.id, task_name='find_faces_task', task_args=args, task_status=TaskStatus.PENDING)
        ts.save()
        import time
        time_start = time.time()
        # Check if we already have the faces for this document
        from biometric_identification.models import Face
        if len(Face.objects.filter(document_id=document_id)) != 0:
            # This document has already been scanned, so need to remove it and rescan
            # Have to manually delete each object per django docs to insure the 
            # model delete method is run to update the metadata.
            logger.debug("Document %s has already been scanned" % document_id)
            faces = Face.objects.filter(document_id=document_id)
            for face in faces:
                face.delete()
                logger.debug("Deleted face=%s" % face.tag_value.value)
        document = Document.objects.get(document_id=document_id)
        image_file = document.get_default_image_file(settings.DEFAULT_DISPLAY_IMAGE)
        image_path = image_file.path
        time_start_looking = time.time()
        temp_file = open(image_path, 'rb')
        temp_image = Image.open(temp_file)
        logger.debug("fred.mode=%s" % fred.mode)
        width, height = temp_image.size
        image = face_recognition.load_image_file(temp_file)
        # Get the coordinates of each face
        if use_cuda:
            # With CUDA installed
            logger.debug("Using CUDA for face recognition")
            face_locations = face_recognition.face_locations(image, model="cnn", number_of_times_to_upsample=0) 
        else:
            # without CUDA installed
            logger.debug("NOT using CUDA for face recognition")
            face_locations = face_recognition.face_locations(image, model="hog", number_of_times_to_upsample=2)
        time_find_faces = time.time()
        # Get the face encodings for each face in the picture    
        face_encodings = face_recognition.face_encodings(image, known_face_locations=face_locations) 
        logger.debug("Found %s face locations and %s encodings" % (len(face_locations), len(face_encodings)))
        time_face_encodings = time.time()
        # Save the faces found in the database
        for location, encoding in zip(face_locations, face_encodings):
            # Create the new Face object and load in the document, encoding, and location of a face found
            # Locations seem to be of the form (y,x)
            from memorabilia.models import MetaData, MetaDataValue
            tag_type_people = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Tag_types')[0].metadata_id, value='People')[0]
            tag_value_unknown = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Unknown')[0].metadata_id, value='Unknown')[0]
            new_face = Face(document=document, face_encoding=numpy_to_json(encoding), face_location=location, image_size='width': width, "height":height, tag_type=tag_type_people, tag_value=tag_value_unknown)         
            # save the newly found Face object
            new_face.save()
            logger.debug("Saved new_face %s" % new_face.face_file) 
        time_end = time.time()
        logger.debug("total time = ".format(time_end - time_start))
        logger.debug("time to find faces = ".format(time_find_faces - time_start_looking))
        logger.debug("time to find encodings = ".format(time_face_encodings - time_find_faces))
        ts.task_status = TaskStatus.SUCCESS
        ts.comment = "Found %s faces" % len(face_encodings)
        return document_id
    except Exception as e:
        logger.exception("Hit an exception in find_faces_task %s" % str(e))
        ts.task_status = TaskStatus.ERROR
        ts.comment = "An exception while finding faces: %s" % repr(e)
    finally:
        logger.debug("Finally clause in find-faces_task")
        if temp_image:
            temp_image.close()
        if temp_file:
            temp_file.close()
        ts.save(update_fields=['task_status', 'comment'])
        logger.debug("find_faces_task END")

find_faces 任务被称为操作图像的更大任务链的一部分。每个图像文件都经过这个链,其中 step_1 和 step_2 是不同图像处理步骤的和弦:

step_1 = chord( group( clean ), chordfinisher.si() ) # clean creates different image sizes
step_2 = chord( group( jobs ), chordfinisher.si() )  # jobs include find_faces
transaction.on_commit(lambda: chain(step_1, step_2, faces_2, ocr_job, change_state_task.si(document_id, 'ready')).delay())

@app.task
def chordfinisher( *args, **kwargs ):
    return "OK"

图像很大,因此find_faces 任务最多可能需要 30 秒才能完成。我认为CELERYD_TASK_SOFT_TIME_LIMIT = 60 会处理这么长的处理时间。

我绝不是 celery 专家,所以我假设我需要启用一个 celery 设置或选项,以确保 find_faces 任务始终完成。我只是不知道那会是什么。

【问题讨论】:

【参考方案1】:

经过更多研究,我可以在这篇文章"Beware the oom-killer, my son! The jaws that bite, the claws that catch!"、这篇文章Chaining Chords produces enormously big messages causing OOM on workers 和这篇文章WorkerLostError: Worker exited prematurely: exitcode 155 中接受刘易斯卡罗尔的这个建议。

看来我的 celery worker 可能内存不足,因为我确实在我的系统日志中找到了可怕的 oomkiller 的痕迹。我将我的任务重新配置为只是在一个链中(删除了所有组和和弦),因此每个任务都按顺序为每个图像单独运行,并且无论我处理多少图像,任务都成功完成。

【讨论】:

以上是关于Celery/redis 任务并不总是完成 - 不知道为啥或如何适应它的主要内容,如果未能解决你的问题,请参考以下文章

使用 Redis 作为 Celery 结果后端和消息代理 - 任务过期(对于存储在 redis 中的密钥)

django+celery+redis环境配置

Celery/Redis 相同的任务被并行执行多次

python 关于celery的异步任务队列的基本使用(celery+redis)采用配置文件设置

python3+celery+redis实现异步任务

Django、Celery、Redis、RabbitMQ:Fanout-On-Writes 的链式任务