Python NotImplementedError:无法在进程之间传递池对象
Posted
技术标签:
【中文标题】Python NotImplementedError:无法在进程之间传递池对象【英文标题】:Python NotImplementedError: pool objects cannot be passed between processes 【发布时间】:2014-10-12 11:55:28 【问题描述】:当页面附加到页面列表时,我正在尝试交付工作,但我的代码输出返回 NotImplementedError。这是我正在尝试做的代码:
代码:
from multiprocessing import Pool, current_process
import time
import random
import copy_reg
import types
import threading
class PageControler(object):
def __init__(self):
self.nProcess = 3
self.pages = [1,2,3,4,5,6,7,8,9,10]
self.manageWork()
def manageWork(self):
self.pool = Pool(processes=self.nProcess)
time.sleep(2)
work_queue = threading.Thread(target=self.modifyQueue)
work_queue.start()
#pool.close()
#pool.join()
def deliverWork(self):
if self.pages != []:
pag = self.pages.pop()
self.pool.apply_async(self.myFun)
def modifyQueue(self):
t = time.time()
while (time.time()-t) < 10:
time.sleep(1)
self.pages.append(99)
print self.pages
self.deliverWork()
def myFun(self):
time.sleep(2)
if __name__ == '__main__':
def _pickle_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _pickle_method)
PageControler()
输出:
NotImplementedError: pool objects cannot be passed between processes or pickled
有什么方法可以在进程之间传递池对象?
编辑:
我正在使用 Python 2.6
【问题讨论】:
python 2.7,我想? Python 2.6,但是我一直在阅读和python 2.7有同样的问题。 【参考方案1】:为了腌制您尝试传递给Pool
的实例方法,Python 需要腌制整个PageControler
对象,包括其实例变量。这些实例变量之一是Pool
对象本身,而Pool
对象不能被腌制,因此出现错误。您可以通过在对象上实现__getstate__
来解决此问题,并在酸洗之前使用它从实例中删除pool
对象:
class PageControler(object):
def __init__(self):
self.nProcess = 3
self.pages = [1,2,3,4,5,6,7,8,9,10]
self.manageWork()
def manageWork(self):
self.pool = Pool(processes=self.nProcess)
time.sleep(2)
work_queue = threading.Thread(target=self.modifyQueue)
work_queue.start()
#pool.close()
#pool.join()
def deliverWork(self):
if self.pages != []:
pag = self.pages.pop()
self.pool.apply_async(self.myFun)
def modifyQueue(self):
t = time.time()
while (time.time()-t) < 10:
time.sleep(1)
self.pages.append(99)
print self.pages
self.deliverWork()
def myFun(self):
time.sleep(2)
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['pool']
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)
__getstate__
总是在腌制对象之前调用,并允许您准确指定对象状态的哪些部分实际上应该被腌制。然后在 unpickling 时,如果 __setstate__(state)
已实现(在我们的例子中)将被调用,否则,__getstate__
返回的 dict
将用作 unpickled 实例的 __dict__
。在上面的示例中,我们将 __dict__
显式设置为我们在 __getstate__
中返回的 dict
,但我们可能没有实现 __setstate__
并获得相同的效果。
【讨论】:
再次感谢您的帮助! 这是一个非常好的答案,帮助解决了我的问题。它使我在 Python3 中做到了这一点,这可能对其他人有用:docs.python.org/3/library/pickle.html#object.__getstate__ 这太棒了!你摇滚@dano!谢谢 哇,真是个好答案。我现在明白过去两个小时发生了什么。谢谢@dano【参考方案2】:如果您必须将整个对象传递给进程,Dano 的答案是一个很好的方法。在您的情况下,您传递给 pool 的函数不需要引用类实例。因此,另一种方法可能是使用 @staticmethod
装饰器使函数成为静态方法。如果函数确实需要引用一个或两个类成员变量,这些变量可以作为只读变量的参数传入,如果还需要写入,则在回调中更新(当然,如果你想这样做,你需要这样做无论如何都要更新本地类实例)。
例如:
Class A(object):
def __init__(self):
self._pool = multiprocessing.Pool(1)
self.member_variable = 1
@staticmethod
def MyFunc(variable):
variable += 1
return variable
def Callback(self, return_val):
self.member_variable = return_val
def CallFuncAsync(self):
pool.apply_async(self.MyFunc, (self.member_variable,), callback=self.Callback)
【讨论】:
以上是关于Python NotImplementedError:无法在进程之间传递池对象的主要内容,如果未能解决你的问题,请参考以下文章