芹菜任务设置与视频帧的内存缓存作为python中的循环缓冲区策略

Posted

技术标签:

【中文标题】芹菜任务设置与视频帧的内存缓存作为python中的循环缓冲区策略【英文标题】:Celery task setup with memory cache for video frames as circular buffer strategy in python 【发布时间】:2018-10-30 15:05:14 【问题描述】:

我想在Celery 上构建一个多任务处理管道,并希望多个任务处理同一个视频文件。 任务需要共享视频数据。因此,并非每个任务都必须从视频文件中解码和提取帧。视频数据将是提取帧的列表(并非视频的每一帧都需要)。

是否有任何解决方案可以有效地共享这些帧?任务可以在不同的节点上处理。但我不想像 Memcached 或 Redis 那样通过网络共享数据。 任务应该在内存/缓存中查找视频数据,如果不存在,则任务应该发出另一个任务来加载视频并将帧提取到缓存。

(每个视频文件的生产者和多个消费者)

因此同一节点/机器上的任务能够共享缓存数据。不同节点上的两个任务通过缓存没有任何好处。

我不想缓存整个提取的视频,必须有一些循环缓冲区缓存。每个视频的缓存具有固定大小,比如说 100 帧。最快和最慢任务之间的差距不能超过 100 帧。内存/缓存中总共只有 100 帧。

出现了两个主要问题:

    任务设置

    任务 A:从视频中提取帧(生产者到内存/缓存)

    任务 B1:消耗帧并做实际工作(处理帧)

    。 .

    任务 Bn:消耗帧并做实际工作(处理帧)

    A, B1 - Bn 并行运行。 但是这些任务必须在同一个节点上运行。如果 B 任务分布在不同的节点上,则必须产生另一个 A 任务(每个节点上都有一个任务来解码和提取帧)。 你在这里推荐什么方法?最好的选择是什么?

    Python 缓存

    是否有任何缓存库/实现/解决方案最适合我的用例,通过一些循环缓冲区实现在本地机器上缓存大数据? 类似于DiskCache,但只能通过环形缓冲缓存 100 帧。

您推荐哪些方法和设计来实现我的用例?我想坚持 Celery 进行任务分配。

【问题讨论】:

您希望运行缓慢的进程丢帧,还是希望快速运行的进程等待运行缓慢的进程? 第二。在示例中,当循环缓冲区已满时,任务 A 将等待最慢的 B 任务(读取指针距离任务 A 的写入指针 100 帧) 【参考方案1】:

这可能是我的固执表现,但我总是发现像 celery 这样的项目在多处理(这已经很复杂)之上增加了一堆复杂性,这比它们的价值更麻烦。从速度和简单性的角度来看,也没有比使用 stdlib 共享内存和互斥锁更好的替代 imo。

对于您的情况,一个简单的解决方案是为每个进程使用一个先进先出队列,并将帧放入来自生产者的每个进程中。如果您为 n 个消费者制作每个帧的 n 个副本,这自然会产生大量内存使用,但是您可能很容易想出一种机制将帧本身放入 multiprocessing.sharedctypes.Array 并仅通过索引传递而是排队。只要限制队列的长度小于缓冲区的长度,就应该限制覆盖缓冲区中的帧,直到它被所有消费者使用。如果没有任何同步,这将是您的裤子座位,但一点点互斥魔法绝对可以使它成为一个非常强大的解决方案。

例如:

import numpy as np
from time import sleep
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.sharedctypes import Array
from ctypes import c_uint8
from functools import reduce

BUFSHAPE = (10,10,10) #10 10x10 images in buffer

class Worker(Process):
    def __init__(self, q_size, buffer, name=''):
        super().__init__()
        self.queue = Queue(q_size)
        self.buffer = buffer
        self.name = name

    def run(self,): #do work here
        #I hardcoded datatype here. you might need to communicate it to the child process
        buf_arr = np.frombuffer(self.buffer.get_obj(), dtype=c_uint8)
        buf_arr.shape = BUFSHAPE
        while True:
            item = self.queue.get()
            if item == 'done':
                print('child process:  completed all frames'.format(self.name))
                return
            with self.buffer.get_lock(): #prevent writing while we're reading
                #slice the frame from the array uning the index that was sent
                frame = buf_arr[item%BUFSHAPE[0]] #depending on your use, you may want to make a copy here
            #do some intense processing on `frame`
            sleep(np.random.rand())
            print('child process:  completed frame: '.format(self.name, item))

def main():
    #creating shared array
    buffer = Array(c_uint8, reduce(lambda a,b: a*b, BUFSHAPE))
    #make a numpy.array using that memory location to make it easy to stuff data into it
    buf_arr = np.frombuffer(buffer.get_obj(), dtype=c_uint8)
    buf_arr.shape = BUFSHAPE
    #create a list of workers
    workers = [Worker(BUFSHAPE[0]-2, #smaller queue than buffer to prevent overwriting frames not yet consumed
                      buffer, #pass in shared buffer array
                      str(i)) #numbered child processes
                      for i in range(5)] #5 workers

    for worker in workers: #start the workers
        worker.start()
    for i in range(100): #generate 100 random frames to send to workers
        #insert a frame into the buffer
        with buffer.get_lock(): #prevent reading while we're writing
            buf_arr[i%BUFSHAPE[0]] = np.random.randint(0,255, size=(10,10), dtype=c_uint8)
        #send the frame number to each worker for processing. If the input queue is full, this will block until there's space
        # this is what prevents `buf_arr[i%BUFSHAPE[0]] = np...` from overwriting a frame that hasn't been processed yet
        for worker in workers:
            worker.queue.put(i)
    #when we're done send the 'done' signal so the child processes exit gracefully (or you could make them daemons)
    for worker in workers:
        worker.queue.put('done')
        worker.join()


if __name__ == "__main__":
    freeze_support()
    main()

编辑

某种非一错误要求队列比缓冲区小 2 帧,而不是 1 帧,以防止在其时间之前覆盖帧。

EDIT2 - 第一次编辑的解释:

len(q) = len(buf)-2 的原因似乎是在我们从缓冲区获取帧之前调用了q.get(),并且在我们尝试将索引推送到队列之前写入了帧本身。如果长度差只有 1,worker 可能会从队列中拉出一个帧索引,然后生产者可能会看到它现在可以推送到队列并在 worker 有机会读取帧之前继续移动到下一帧本身。您可以通过多种不同的方式来处理这个问题,这可能会减少一直等待对方的进程,也许使用mp.Event

【讨论】:

以上是关于芹菜任务设置与视频帧的内存缓存作为python中的循环缓冲区策略的主要内容,如果未能解决你的问题,请参考以下文章

芹菜任务不会在 django 中执行

Flask- celery (芹菜)

Django 学习之Celery(芹菜)

检索芹菜队列中的任务列表

芹菜中的预取任务是不是被确认?

从芹菜任务中获取芹菜工人的名字?