并发数据加载器的干净,pythonic方式?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发数据加载器的干净,pythonic方式?相关的知识,希望对你有一定的参考价值。

Python 3

我想知道一个非常干净,pythonic并发数据加载器应该是什么样子。我需要这种方法来处理我的一个项目,该项目对数据进行大量计算,这些数据太大而不能完全适合内存。因此,我实现了应该并发运行并将数据存储在队列中的数据加载器,以便主进程可以在(同时)下载和准备下一个数据的同时工作。当然,队列应该在它为空时阻塞(主进程尝试消耗更多项 - >队列应该等待新数据)或完全(工作进程应该等到主进程将数据从队列中消耗掉以防止超出 - 记忆错误)。

我用Python的multiprocessing模块(multiprocessing.Queuemultiprocessing.Process)写了一个类来满足这个需求。该类的关键部分实现如下:

import multiprocessing as mp
from itertools import cycle    

class ConcurrentLoader:
    def __init__(path_to_data, queue_size, batch_size):
        self._batch_size
        self._path = path_to_data
        filenames = ... # filenames for path 'path_to_data',
                        # get loaded using glob
        self._files = cycle()
        self._q = mp.Queue(queue_size)
        ...
        self._worker = mp.Process(target=self._worker_func, daemon=True)
        self._worker.start() # only started, never stopped

    def _worker_func(self):
        while True:
            buffer = list()
            for i in range(batch_size):
                f = next(self._files)
                ... # load f and do some pre-processing with NumPy
                ... # add it to buffer
            self._q.put(np.array(buffer).astype(np.float32))

    def get_batch_data(self):
        self._q.get()

该类有更多的方法,但它们都是为了“方便功能”。例如,它在dict中计算每个文件的加载频率,加载整个数据集的频率等等,但这些在Python中很容易实现,并且不会浪费太多的计算时间(集合,dicts,.. )。

另一方面,由于I / O和预处理,数据部分本身甚至可能需要几秒钟。这就是为什么我希望这种情况同时发生的原因。

ConcurrentLoader应该:

  • 阻止主进程:如果调用get_batch_data,但队列为空
  • 阻止工作进程:如果队列已满,则防止出现内存不足错误并阻止while True浪费资源
  • 对任何使用ConcurrentLoader的类都是“透明的”:它们应该提供数据的路径并使用get_batch_data而不会注意到这实际上同时工作(“无忧使用”)
  • 当主进程死亡以再次释放资源时终止其worker

考虑到这些目标(我忘了什么吗?)我应该怎么做才能增强当前的实施?线程/死锁是否安全?是否有更“pythonic”的实施方式?我可以把它弄干净吗?浪费资源不知何故?

任何使用ConcurrentLoader的类都会大致遵循以下设置:

class Foo:
    ...

    def do_something(self):
        ...
        data1 = ConcurrentLoader("path/to/data1", 64, 8)
        data2 = ConcurrentLoader("path/to/data2", 256, 16)
        ...
        sample1 = data1.get_batch_data()
        sample2 = data2.get_batch_data()
        ... # heavy computations with data contained in 'sample1' & 'sample2'
            # go *here*

请指出任何类型的错误,以改善我的方法或提供一个自己的,更清洁,更pythonic的方法。

答案
  • 阻止当multiprocessing.Queue为空/满并且在其上调用get() / put()时会自动发生。
  • 此行为对调用函数是透明的。
  • self._worker.daemon = True之前使用self._worker.start(),这样当主进程退出时,工作人员将自动被杀死

以上是关于并发数据加载器的干净,pythonic方式?的主要内容,如果未能解决你的问题,请参考以下文章

并发编程的艺术

WebView中打开相机,文件选择器的问题和解决方法

如何以我只训练一次分类器的方式调整 NLTK Python 代码

React拓展 - setState - 路由组件懒加载 - Hooks - Fragment - Context - PureComponent - 插槽 - 错误边界 - 组件通信方式总结(代码片

Python 多线程爬虫

良好的资源加载系统