为啥工作人员从主线程创建对象的副本?

Posted

技术标签:

【中文标题】为啥工作人员从主线程创建对象的副本?【英文标题】:Why are workers creating copies of the objects from the main thread?为什么工作人员从主线程创建对象的副本? 【发布时间】:2020-02-05 07:38:53 【问题描述】:

我正在尝试使用 Python 中的工作人员创建一个简单的池,目的是从主线程上的迭代器中获取值,从而更新该迭代器。 (目的是并行化迭代器,同时在主线程上消费其结果)

 import multiprocessing as mp

 pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))
 #details are given below

但由于某种原因,Pool 似乎正在为每个线程创建迭代器的副本,而不是简单地在主线程中更新它。 (问题在文末)

迭代器

所以,这就是我要并行化的迭代器。我确保从其中并行获取项目是安全的,并且在获取项目时不使用更新的值:

class TrySeq(object):
    def __init__(self):
        print('created iterator')
        self.gotten = 0 #a simple counter informing how many items were gotten
    def __len__(self):
        return 10
    def __getitem__(self, i):
        time.sleep(3) #simulate a heavy operation

        #must update the gotten count but this value won't affect the values of the items
        self.gotten += 1 
        print('Iterator: got item', i, ' - gotten total: ', self.gotten)
        return (i, i)

并行化生成器

现在,这是一个生成器,它将包装该迭代器以便“不可见地”并行化它。

它工作得很好,除了更新gotten 值外,它完全符合我的预期。 (我知道它在每个epoch 中等待同步,这不是这个问题的问题)。

#A generator that wraps an iterator and loads items assynchronously   
def ParallelIterator(iterator, epochs, shuffle, workers = 4, queue_size = 10):

    sourceQueue = mp.Queue()                     #queue for getting batch indices
    batchQueue = mp.Queue(maxsize = queue_size)  #queue for getting actual batches 
    indices = np.arange(len(iterator))     #array of indices to be shuffled

    #fills the batch indices queue (called when sourceQueue is empty -> a few batches before an epoch ends)
    def fillSource():
        #print("Iterator: fill source - source qsize = ", sourceQueue.qsize() )
        if shuffle == True:
            np.random.shuffle(indices)

        #puts the indices in the indices queue
        for i in indices:
            sourceQueue.put(i)

    #function that will load batches from the iterator
    def worker(indicesQueue, destinationQueue, itera):
        while True:
            index = indicesQueue.get(block = True) #get index from the queue
            item = itera[index] #get batch from the iterator
            destinationQueue.put((index,item), block=True) #puts batch in the batch queue


    #creates the thread pool that will work automatically as we get from the batch queue
    pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))

    #generation loop
    for epoch in range(epochs):
        fillSource()
        for batch in range(len(iterator)):

            #yields batches for the outside loop that is using this generator
            originalIndex, batchItems = batchQueue.get(block = True)
            yield epoch, batch, originalIndex, batchItems

    pool.close()
    sourceQueue.close()
    batchQueue.close()
    del pool
    del sourceQueue
    del batchQueue

似乎Pool 只是为每个线程复制迭代器,但我希望所有线程在主线程中更新同一个生成器

使用生成器:

这个想法是非常简单地使用它,像这样:

#outside loop: 
for e, b, oB, xAndY in ParallelIterator(TrySeq(), 3, True, workers = 3):
    time.sleep(1)
    #print('yield e:', e, " - b:", b, " origB: ", oB, "- data:", xAndY)

当前输出:

现在,当我运行它时,我看到 每个 的工人都有一个 gotten 值,而不是像预期的那样有一个主要的 gotten 值:

created iterator
Iterator: got item 8  - gotten total:  1
Iterator: got item 2  - gotten total:  1
Iterator: got item 0  - gotten total:  1
Iterator: got item 1  - gotten total:  2
Iterator: got item 7  - gotten total:  2
Iterator: got item 6  - gotten total:  2
Iterator: got item 9  - gotten total:  3
Iterator: got item 5  - gotten total:  3
Iterator: got item 3  - gotten total:  3
Iterator: got item 4  - gotten total:  4
Iterator: got item 4  - gotten total:  4
Iterator: got item 2  - gotten total:  5
Iterator: got item 3  - gotten total:  4
Iterator: got item 6  - gotten total:  5
Iterator: got item 7  - gotten total:  5
Iterator: got item 5  - gotten total:  6
Iterator: got item 1  - gotten total:  6
Iterator: got item 9  - gotten total:  7
Iterator: got item 0  - gotten total:  6
Iterator: got item 8  - gotten total:  7
Iterator: got item 7  - gotten total:  8
Iterator: got item 8  - gotten total:  7
Iterator: got item 2  - gotten total:  8
Iterator: got item 3  - gotten total:  8
Iterator: got item 9  - gotten total:  9
Iterator: got item 1  - gotten total:  9
Iterator: got item 6  - gotten total:  9
Iterator: got item 4  - gotten total:  10
Iterator: got item 0  - gotten total:  10
Iterator: got item 5  - gotten total:  10
finished

问题

为什么会发生这种情况? 如何更新ParallelIterator,使其作用于主iterator,而不是为每个线程创建一个副本?

【问题讨论】:

multiprocessing 不是线程。尽管 API 尽最大努力假装这无关紧要,但在使用 multiprocessing 时,这是最重要的事情之一。这是效果之一。 虽然这对于理解“线程”的人来说可能是一个合理的解释,但对于像我这样的“多处理/线程”新手来说,这真的没什么好说的。我不知道有什么区别,更不用说为什么这解释了副作用,或者如何解决它。 @DanielMöller 获得 47k4 声誉 应该意味着首先重新阅读文档 - 其中主要的 GIL 阻塞重新序列化为纯 [SERIAL] 代码执行(对于基于线程的后端(Linux、MacOs))以及(完整副本)进程实例化的成本都被明确记录并突出显示,并明确警告任何共享和/或锁定的附加成本。 这是基本的。使用任何形式的语法而不首先了解它在给定硬件和 O/S 生态系统上的工作原理会导致这样的意外。 【参考方案1】:

你没有显示你的导入,但我猜你有:

import multiprocessing as mp

在文件的顶部。 multiprocessing 不受线程支持,它由forked 或衍生进程支持,每个进程都有独立的内存和独立的变量。您的初始化是腌制值(重要的是,iterator),然后取消腌制每个值的新副本,并在每个工作进程中单独使用它们(注意:在fork 而不是产生工作人员的系统上,酸洗可能不涉及,但效果是一样的;原始数据在fork时间被“快照”,每个工作进程继承自己独立的数据快照,与另一个数据没有任何关联进程)。

如果您打算使用线程,请将导入更改为:

import multiprocessing.dummy as mp

它将支持实现更改为基于线程的池,而不是基于进程的池。基于线程的池位于单个共享内存空间中;不涉及任何类型的酸洗/解酸或进程间通信。缺点是 CPython 参考解释器上的并行性将受到the GIL 的限制,更大的共享意味着需要更多的同步来防止竞争条件。

如果您想要进程,那将是一个非常痛苦的过程,因为您实际上被困在为 make it multiprocessing.Manager compliant 的迭代器类型实现代理包装器上,这将是一个非常痛苦的过程。

【讨论】:

以上是关于为啥工作人员从主线程创建对象的副本?的主要内容,如果未能解决你的问题,请参考以下文章

在 Apple 的 Cocoa API 中,为啥从主线程调用 NSApplicationMain 很重要?

Java 并发编程线程简介 ( 原子操作 | volatile 关键字使用场景 )

如何在工作线程中重用主线程创建的OMP线程池?

Volatile理解

为啥从主线程调用时,`std::promise::set_value` 会抛出错误?

为啥win32线程不自动退出?