为啥工作人员从主线程创建对象的副本?
Posted
技术标签:
【中文标题】为啥工作人员从主线程创建对象的副本?【英文标题】:Why are workers creating copies of the objects from the main thread?为什么工作人员从主线程创建对象的副本? 【发布时间】:2020-02-05 07:38:53 【问题描述】:我正在尝试使用 Python 中的工作人员创建一个简单的池,目的是从主线程上的迭代器中获取值,从而更新该迭代器。 (目的是并行化迭代器,同时在主线程上消费其结果)
import multiprocessing as mp
pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))
#details are given below
但由于某种原因,Pool
似乎正在为每个线程创建迭代器的副本,而不是简单地在主线程中更新它。 (问题在文末)
迭代器
所以,这就是我要并行化的迭代器。我确保从其中并行获取项目是安全的,并且在获取项目时不使用更新的值:
class TrySeq(object):
def __init__(self):
print('created iterator')
self.gotten = 0 #a simple counter informing how many items were gotten
def __len__(self):
return 10
def __getitem__(self, i):
time.sleep(3) #simulate a heavy operation
#must update the gotten count but this value won't affect the values of the items
self.gotten += 1
print('Iterator: got item', i, ' - gotten total: ', self.gotten)
return (i, i)
并行化生成器
现在,这是一个生成器,它将包装该迭代器以便“不可见地”并行化它。
它工作得很好,除了更新gotten
值外,它完全符合我的预期。 (我知道它在每个epoch
中等待同步,这不是这个问题的问题)。
#A generator that wraps an iterator and loads items assynchronously
def ParallelIterator(iterator, epochs, shuffle, workers = 4, queue_size = 10):
sourceQueue = mp.Queue() #queue for getting batch indices
batchQueue = mp.Queue(maxsize = queue_size) #queue for getting actual batches
indices = np.arange(len(iterator)) #array of indices to be shuffled
#fills the batch indices queue (called when sourceQueue is empty -> a few batches before an epoch ends)
def fillSource():
#print("Iterator: fill source - source qsize = ", sourceQueue.qsize() )
if shuffle == True:
np.random.shuffle(indices)
#puts the indices in the indices queue
for i in indices:
sourceQueue.put(i)
#function that will load batches from the iterator
def worker(indicesQueue, destinationQueue, itera):
while True:
index = indicesQueue.get(block = True) #get index from the queue
item = itera[index] #get batch from the iterator
destinationQueue.put((index,item), block=True) #puts batch in the batch queue
#creates the thread pool that will work automatically as we get from the batch queue
pool = mp.Pool(workers, worker, (sourceQueue, batchQueue, iterator))
#generation loop
for epoch in range(epochs):
fillSource()
for batch in range(len(iterator)):
#yields batches for the outside loop that is using this generator
originalIndex, batchItems = batchQueue.get(block = True)
yield epoch, batch, originalIndex, batchItems
pool.close()
sourceQueue.close()
batchQueue.close()
del pool
del sourceQueue
del batchQueue
似乎Pool
只是为每个线程复制迭代器,但我希望所有线程在主线程中更新同一个生成器
使用生成器:
这个想法是非常简单地使用它,像这样:
#outside loop:
for e, b, oB, xAndY in ParallelIterator(TrySeq(), 3, True, workers = 3):
time.sleep(1)
#print('yield e:', e, " - b:", b, " origB: ", oB, "- data:", xAndY)
当前输出:
现在,当我运行它时,我看到 每个 的工人都有一个 gotten
值,而不是像预期的那样有一个主要的 gotten
值:
created iterator
Iterator: got item 8 - gotten total: 1
Iterator: got item 2 - gotten total: 1
Iterator: got item 0 - gotten total: 1
Iterator: got item 1 - gotten total: 2
Iterator: got item 7 - gotten total: 2
Iterator: got item 6 - gotten total: 2
Iterator: got item 9 - gotten total: 3
Iterator: got item 5 - gotten total: 3
Iterator: got item 3 - gotten total: 3
Iterator: got item 4 - gotten total: 4
Iterator: got item 4 - gotten total: 4
Iterator: got item 2 - gotten total: 5
Iterator: got item 3 - gotten total: 4
Iterator: got item 6 - gotten total: 5
Iterator: got item 7 - gotten total: 5
Iterator: got item 5 - gotten total: 6
Iterator: got item 1 - gotten total: 6
Iterator: got item 9 - gotten total: 7
Iterator: got item 0 - gotten total: 6
Iterator: got item 8 - gotten total: 7
Iterator: got item 7 - gotten total: 8
Iterator: got item 8 - gotten total: 7
Iterator: got item 2 - gotten total: 8
Iterator: got item 3 - gotten total: 8
Iterator: got item 9 - gotten total: 9
Iterator: got item 1 - gotten total: 9
Iterator: got item 6 - gotten total: 9
Iterator: got item 4 - gotten total: 10
Iterator: got item 0 - gotten total: 10
Iterator: got item 5 - gotten total: 10
finished
问题
为什么会发生这种情况? 如何更新ParallelIterator
,使其作用于主iterator
,而不是为每个线程创建一个副本?
【问题讨论】:
multiprocessing
不是线程。尽管 API 尽最大努力假装这无关紧要,但在使用 multiprocessing
时,这是最重要的事情之一。这是效果之一。
虽然这对于理解“线程”的人来说可能是一个合理的解释,但对于像我这样的“多处理/线程”新手来说,这真的没什么好说的。我不知道有什么区别,更不用说为什么这解释了副作用,或者如何解决它。
@DanielMöller 获得 47k4 声誉 应该意味着首先重新阅读文档 - 其中主要的 GIL 阻塞重新序列化为纯 [SERIAL] 代码执行(对于基于线程的后端(Linux、MacOs))以及(完整副本)进程实例化的成本都被明确记录并突出显示,并明确警告任何共享和/或锁定的附加成本。 这是基本的。使用任何形式的语法而不首先了解它在给定硬件和 O/S 生态系统上的工作原理会导致这样的意外。
【参考方案1】:
你没有显示你的导入,但我猜你有:
import multiprocessing as mp
在文件的顶部。 multiprocessing
不受线程支持,它由fork
ed 或衍生进程支持,每个进程都有独立的内存和独立的变量。您的初始化是腌制值(重要的是,iterator
),然后取消腌制每个值的新副本,并在每个工作进程中单独使用它们(注意:在fork
而不是产生工作人员的系统上,酸洗可能不涉及,但效果是一样的;原始数据在fork
时间被“快照”,每个工作进程继承自己独立的数据快照,与另一个数据没有任何关联进程)。
如果您打算使用线程,请将导入更改为:
import multiprocessing.dummy as mp
它将支持实现更改为基于线程的池,而不是基于进程的池。基于线程的池位于单个共享内存空间中;不涉及任何类型的酸洗/解酸或进程间通信。缺点是 CPython 参考解释器上的并行性将受到the GIL 的限制,更大的共享意味着需要更多的同步来防止竞争条件。
如果您想要进程,那将是一个非常痛苦的过程,因为您实际上被困在为 make it multiprocessing.Manager
compliant 的迭代器类型实现代理包装器上,这将是一个非常痛苦的过程。
【讨论】:
以上是关于为啥工作人员从主线程创建对象的副本?的主要内容,如果未能解决你的问题,请参考以下文章
在 Apple 的 Cocoa API 中,为啥从主线程调用 NSApplicationMain 很重要?
Java 并发编程线程简介 ( 原子操作 | volatile 关键字使用场景 )