多处理和莳萝可以一起做啥?
Posted
技术标签:
【中文标题】多处理和莳萝可以一起做啥?【英文标题】:What can multiprocessing and dill do together?多处理和莳萝可以一起做什么? 【发布时间】:2013-11-27 20:19:43 【问题描述】:我想在 Python 中使用 multiprocessing
库。可悲的是multiprocessing
使用了pickle
,它不支持带有闭包、lambda 的函数或__main__
中的函数。这三个对我来说都很重要
In [1]: import pickle
In [2]: pickle.dumps(lambda x: x)
PicklingError: Can't pickle <function <lambda> at 0x23c0e60>: it's not found as __main__.<lambda>
幸运的是,dill
更健壮的泡菜。显然dill
在导入时执行魔术以使泡菜工作
In [3]: import dill
In [4]: pickle.dumps(lambda x: x)
Out[4]: "cdill.dill\n_load_type\np0\n(S'FunctionType'\np1 ...
这非常令人鼓舞,特别是因为我无法访问多处理源代码。可悲的是,我仍然无法让这个非常基本的示例工作
import multiprocessing as mp
import dill
p = mp.Pool(4)
print p.map(lambda x: x**2, range(10))
这是为什么?我错过了什么? multiprocessing
+dill
组合的具体限制是什么?
J.F Sebastian 的临时编辑
mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py
Temporary Edit for J.F Sebastian
mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py
Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
^C
...lots of junk...
[DEBUG/MainProcess] cleaning up worker 3
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] cleaning up worker 1
[DEBUG/MainProcess] cleaning up worker 0
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-8] child process calling self.run()Exception in thread Thread-2:
Traceback (most recent call last):
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
^C
...lots of junk...
[DEBUG/MainProcess] cleaning up worker 3
[DEBUG/MainProcess] cleaning up worker 2
[DEBUG/MainProcess] cleaning up worker 1
[DEBUG/MainProcess] cleaning up worker 0
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-5] child process calling self.run()
[INFO/PoolWorker-6] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-7] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-8] child process calling self.run()
【问题讨论】:
你试过用if __name__ == "__main__":
保护池
@J.F.Sebastian 是的,没有变化。明确地说,我在p = mp.Pool(4)
之前和之后都放置了该行,结果没有变化。
1.添加实际代码(使用警卫) 2. 是否有追溯? 3.启用日志记录:mp.log_to_stderr().setLevel(logging.DEBUG)
先尝试导入 dill。
@J.F.Sebastian 查看带有回溯的编辑
【参考方案1】:
multiprocessing
在酸洗方面做出了一些错误的选择。不要误会我的意思,它做出了一些不错的选择,使其能够腌制某些类型,以便它们可以在池的地图功能中使用。然而,由于我们有dill
可以进行酸洗,多处理自己的酸洗变得有点限制。实际上,如果multiprocessing
使用pickle
而不是cPickle
... 并且还放弃了一些它自己的酸洗覆盖,那么dill
可以接管并为multiprocessing
提供更完整的序列化。
在此之前,multiprocessing
的一个分支称为 pathos(不幸的是,发布版本有点陈旧)消除了上述限制。 Pathos 还添加了多处理没有的一些不错的功能,例如 map 函数中的多参数。 Pathos 即将发布,经过一些温和的更新——主要是转换为 python 3.x。
Python 2.7.5 (default, Sep 30 2013, 20:15:49)
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import dill
>>> from pathos.multiprocessing import ProcessingPool
>>> pool = ProcessingPool(nodes=4)
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
只是为了炫耀一下pathos.multiprocessing
可以做什么......
>>> def busy_add(x,y, delay=0.01):
... for n in range(x):
... x += n
... for n in range(y):
... y -= n
... import time
... time.sleep(delay)
... return x + y
...
>>> def busy_squared(x):
... import time, random
... time.sleep(2*random.random())
... return x*x
...
>>> def squared(x):
... return x*x
...
>>> def quad_factory(a=1, b=1, c=0):
... def quad(x):
... return a*x**2 + b*x + c
... return quad
...
>>> square_plus_one = quad_factory(2,0,1)
>>>
>>> def test1(pool):
... print pool
... print "x: %s\n" % str(x)
... print pool.map.__name__
... start = time.time()
... res = pool.map(squared, x)
... print "time to results:", time.time() - start
... print "y: %s\n" % str(res)
... print pool.imap.__name__
... start = time.time()
... res = pool.imap(squared, x)
... print "time to queue:", time.time() - start
... start = time.time()
... res = list(res)
... print "time to results:", time.time() - start
... print "y: %s\n" % str(res)
... print pool.amap.__name__
... start = time.time()
... res = pool.amap(squared, x)
... print "time to queue:", time.time() - start
... start = time.time()
... res = res.get()
... print "time to results:", time.time() - start
... print "y: %s\n" % str(res)
...
>>> def test2(pool, items=4, delay=0):
... _x = range(-items/2,items/2,2)
... _y = range(len(_x))
... _d = [delay]*len(_x)
... print map
... res1 = map(busy_squared, _x)
... res2 = map(busy_add, _x, _y, _d)
... print pool.map
... _res1 = pool.map(busy_squared, _x)
... _res2 = pool.map(busy_add, _x, _y, _d)
... assert _res1 == res1
... assert _res2 == res2
... print pool.imap
... _res1 = pool.imap(busy_squared, _x)
... _res2 = pool.imap(busy_add, _x, _y, _d)
... assert list(_res1) == res1
... assert list(_res2) == res2
... print pool.amap
... _res1 = pool.amap(busy_squared, _x)
... _res2 = pool.amap(busy_add, _x, _y, _d)
... assert _res1.get() == res1
... assert _res2.get() == res2
... print ""
...
>>> def test3(pool): # test against a function that should fail in pickle
... print pool
... print "x: %s\n" % str(x)
... print pool.map.__name__
... start = time.time()
... res = pool.map(square_plus_one, x)
... print "time to results:", time.time() - start
... print "y: %s\n" % str(res)
...
>>> def test4(pool, maxtries, delay):
... print pool
... m = pool.amap(busy_add, x, x)
... tries = 0
... while not m.ready():
... time.sleep(delay)
... tries += 1
... print "TRY: %s" % tries
... if tries >= maxtries:
... print "TIMEOUT"
... break
... print m.get()
...
>>> import time
>>> x = range(18)
>>> delay = 0.01
>>> items = 20
>>> maxtries = 20
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> pool = Pool(nodes=4)
>>> test1(pool)
<pool ProcessingPool(ncpus=4)>
x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
map
time to results: 0.0553691387177
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
imap
time to queue: 7.91549682617e-05
time to results: 0.102381229401
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
amap
time to queue: 7.08103179932e-05
time to results: 0.0489699840546
y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289]
>>> test2(pool, items, delay)
<built-in function map>
<bound method ProcessingPool.map of <pool ProcessingPool(ncpus=4)>>
<bound method ProcessingPool.imap of <pool ProcessingPool(ncpus=4)>>
<bound method ProcessingPool.amap of <pool ProcessingPool(ncpus=4)>>
>>> test3(pool)
<pool ProcessingPool(ncpus=4)>
x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17]
map
time to results: 0.0523059368134
y: [1, 3, 9, 19, 33, 51, 73, 99, 129, 163, 201, 243, 289, 339, 393, 451, 513, 579]
>>> test4(pool, maxtries, delay)
<pool ProcessingPool(ncpus=4)>
TRY: 1
TRY: 2
TRY: 3
TRY: 4
TRY: 5
TRY: 6
TRY: 7
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34]
【讨论】:
我很高兴这个应用程序在莳萝开发人员的脑海中。酸洗的脆弱性确实削弱了多处理能力。我怀疑这个问题阻止了许多开发人员对多处理的探索。 从 GitHub 安装最新版本,我使用了pip install git+https://github.com/uqfoundation/pathos
奇怪的是,这在 python 3 中也不起作用,即使它使用pickle
而不是cPickle
。引发了相同的PicklingError
异常,因此mp
设法使用原始的_pickle
模块而不是dill
,尽管我的import dill as pickle
@max: 这意味着3.x
中的pickle
是cPickle
,其中名称现在只是pickle
。我可以看到。叹。也许我知道并忘记了。我希望能尽快完成pathos.multiprocessing
的3.x
版本。
@max:我已更新 pathos
以构建和安装 python 3.x
,并且即将发布新版本(即 pip install pathos
将按预期工作)。【参考方案2】:
您可能想尝试使用 multiprocessing_on_dill 库,它是在后端实现 dill 的多处理分支。
例如,您可以运行:
>>> import multiprocessing_on_dill as multiprocessing
>>> with multiprocessing.Pool() as pool:
... pool.map(lambda x: x**2, range(10))
...
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
【讨论】:
【参考方案3】:覆盖多处理模块 Pickle 类
import dill, multiprocessing
dill.Pickler.dumps, dill.Pickler.loads = dill.dumps, dill.loads
multiprocessing.reduction.ForkingPickler = dill.Pickler
multiprocessing.reduction.dump = dill.dump
multiprocessing.queues._ForkingPickler = dill.Pickler
【讨论】:
您的答案可以通过额外的支持信息得到改进。请edit 添加更多详细信息,例如引用或文档,以便其他人可以确认您的答案是正确的。你可以找到更多关于如何写好答案的信息in the help center。 这个对我有用,谢谢。我只需要删除multiprocessing.queues._ForkingPickler = dill.Pickler
,因为它显然不再存在。【参考方案4】:
我知道这个线程很旧,但是,您不一定必须使用 pathos
模块,正如 Mike McKerns 指出的那样。我也觉得 multiprocessing
使用 pickle
而不是 dill
很烦人,所以你可以这样做:
import multiprocessing as mp
import dill
def helperFunction(f, inp, *args, **kwargs):
import dill # reimport, just in case this is not available on the new processes
f = dill.loads(f) # converts bytes to (potentially lambda) function
return f(inp, *args, **kwargs)
def mapStuff(f, inputs, *args, **kwargs):
pool = mp.Pool(6) # create a 6-worker pool
f = dill.dumps(f) # converts (potentially lambda) function to bytes
futures = [pool.apply_async(helperFunction, [f, inp, *args], kwargs) for inp in inputs]
return [f.get() for f in futures]
然后,你可以这样使用它:
mapStuff(lambda x: x**2, [2, 3]) # returns [4, 9]
mapStuff(lambda x, b: x**2 + b, [2, 3], 1) # returns [5, 10]
mapStuff(lambda x, b: x**2 + b, [2, 3], b=1) # also returns [5, 10]
def f(x):
return x**2
mapStuff(f, [4, 5]) # returns [16, 25]
它的工作原理基本上是,将 lambda 函数转换为 bytes
对象,将其传递给子进程,并让它重构 lambda 函数。代码中,我刚刚使用dill
序列化函数,但如果需要,你也可以序列化参数。
【讨论】:
以上是关于多处理和莳萝可以一起做啥?的主要内容,如果未能解决你的问题,请参考以下文章