Celery/redis 任务并不总是完成 - 不知道为啥或如何适应它
Posted
技术标签:
【中文标题】Celery/redis 任务并不总是完成 - 不知道为啥或如何适应它【英文标题】:Celery/redis tasks don't always complete - not sure why or how to fit itCelery/redis 任务并不总是完成 - 不知道为什么或如何适应它 【发布时间】:2022-01-01 13:59:42 【问题描述】:我在 django v 3.0.1 应用程序(Python v 3.6.9)中运行 celery v 4.0.3/redis v 4.09。我还在芹菜任务find_faces
中使用face_recognition 在我上传到应用程序的图像中查找人脸,以及其他图像处理芹菜任务。处理五个或更少的图像文件没有问题,因为所有图像处理 celery 任务都成功完成。
当我有图像处理任务(包括find_faces
)迭代超过 100 张图像时,有 10-30 张图像没有完成find_faces
任务。当我使用flower v0.9.7 查看 celery 任务时,我看到那些未完成的图像的find_faces
任务状态为“已启动”。所有其他图像的find_faces
任务状态为“成功”。这些“已启动”任务的状态永远不会改变,也不会报告错误或异常。然后,我可以在每个图像上单独运行图像处理任务,包括find_faces
任务,任务状态为“成功”。如果我将 celery 作为守护进程或本地运行,或者我使用 wsgi 和 apache 或 runserver 运行 django 应用程序,这些结果不会改变。 Flower 还报告说,我所有任务的重试次数 = 0。
我在 django 应用中全局设置了CELERYD_TASK_SOFT_TIME_LIMIT = 60
,并为find_faces
任务设置了max_retries=5
。
@app.task(bind=True, max_retries=5)
def find_faces_task(self, document_id, use_cuda=settings.USE_CUDA):
logger.debug("find_faces_task START")
try:
temp_face = None
from memorabilia.models import TaskStatus, Document
args = "document_id=%s, use_cuda=%s" % (document_id, use_cuda)
ts = TaskStatus(document_id_id=document_id, task_id=self.request.id, task_name='find_faces_task', task_args=args, task_status=TaskStatus.PENDING)
ts.save()
import time
time_start = time.time()
# Check if we already have the faces for this document
from biometric_identification.models import Face
if len(Face.objects.filter(document_id=document_id)) != 0:
# This document has already been scanned, so need to remove it and rescan
# Have to manually delete each object per django docs to insure the
# model delete method is run to update the metadata.
logger.debug("Document %s has already been scanned" % document_id)
faces = Face.objects.filter(document_id=document_id)
for face in faces:
face.delete()
logger.debug("Deleted face=%s" % face.tag_value.value)
document = Document.objects.get(document_id=document_id)
image_file = document.get_default_image_file(settings.DEFAULT_DISPLAY_IMAGE)
image_path = image_file.path
time_start_looking = time.time()
temp_file = open(image_path, 'rb')
temp_image = Image.open(temp_file)
logger.debug("fred.mode=%s" % fred.mode)
width, height = temp_image.size
image = face_recognition.load_image_file(temp_file)
# Get the coordinates of each face
if use_cuda:
# With CUDA installed
logger.debug("Using CUDA for face recognition")
face_locations = face_recognition.face_locations(image, model="cnn", number_of_times_to_upsample=0)
else:
# without CUDA installed
logger.debug("NOT using CUDA for face recognition")
face_locations = face_recognition.face_locations(image, model="hog", number_of_times_to_upsample=2)
time_find_faces = time.time()
# Get the face encodings for each face in the picture
face_encodings = face_recognition.face_encodings(image, known_face_locations=face_locations)
logger.debug("Found %s face locations and %s encodings" % (len(face_locations), len(face_encodings)))
time_face_encodings = time.time()
# Save the faces found in the database
for location, encoding in zip(face_locations, face_encodings):
# Create the new Face object and load in the document, encoding, and location of a face found
# Locations seem to be of the form (y,x)
from memorabilia.models import MetaData, MetaDataValue
tag_type_people = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Tag_types')[0].metadata_id, value='People')[0]
tag_value_unknown = MetaDataValue.objects.filter(metadata_id=MetaData.objects.filter(name='Unknown')[0].metadata_id, value='Unknown')[0]
new_face = Face(document=document, face_encoding=numpy_to_json(encoding), face_location=location, image_size='width': width, "height":height, tag_type=tag_type_people, tag_value=tag_value_unknown)
# save the newly found Face object
new_face.save()
logger.debug("Saved new_face %s" % new_face.face_file)
time_end = time.time()
logger.debug("total time = ".format(time_end - time_start))
logger.debug("time to find faces = ".format(time_find_faces - time_start_looking))
logger.debug("time to find encodings = ".format(time_face_encodings - time_find_faces))
ts.task_status = TaskStatus.SUCCESS
ts.comment = "Found %s faces" % len(face_encodings)
return document_id
except Exception as e:
logger.exception("Hit an exception in find_faces_task %s" % str(e))
ts.task_status = TaskStatus.ERROR
ts.comment = "An exception while finding faces: %s" % repr(e)
finally:
logger.debug("Finally clause in find-faces_task")
if temp_image:
temp_image.close()
if temp_file:
temp_file.close()
ts.save(update_fields=['task_status', 'comment'])
logger.debug("find_faces_task END")
find_faces
任务被称为操作图像的更大任务链的一部分。每个图像文件都经过这个链,其中 step_1 和 step_2 是不同图像处理步骤的和弦:
step_1 = chord( group( clean ), chordfinisher.si() ) # clean creates different image sizes
step_2 = chord( group( jobs ), chordfinisher.si() ) # jobs include find_faces
transaction.on_commit(lambda: chain(step_1, step_2, faces_2, ocr_job, change_state_task.si(document_id, 'ready')).delay())
@app.task
def chordfinisher( *args, **kwargs ):
return "OK"
图像很大,因此find_faces
任务最多可能需要 30 秒才能完成。我认为CELERYD_TASK_SOFT_TIME_LIMIT = 60
会处理这么长的处理时间。
我绝不是 celery 专家,所以我假设我需要启用一个 celery 设置或选项,以确保 find_faces
任务始终完成。我只是不知道那会是什么。
【问题讨论】:
【参考方案1】:经过更多研究,我可以在这篇文章"Beware the oom-killer, my son! The jaws that bite, the claws that catch!"、这篇文章Chaining Chords produces enormously big messages causing OOM on workers 和这篇文章WorkerLostError: Worker exited prematurely: exitcode 155 中接受刘易斯卡罗尔的这个建议。
看来我的 celery worker 可能内存不足,因为我确实在我的系统日志中找到了可怕的 oomkiller 的痕迹。我将我的任务重新配置为只是在一个链中(删除了所有组和和弦),因此每个任务都按顺序为每个图像单独运行,并且无论我处理多少图像,任务都成功完成。
【讨论】:
以上是关于Celery/redis 任务并不总是完成 - 不知道为啥或如何适应它的主要内容,如果未能解决你的问题,请参考以下文章
使用 Redis 作为 Celery 结果后端和消息代理 - 任务过期(对于存储在 redis 中的密钥)