如何将具有多个参数的函数传递给 python concurrent.futures.ProcessPoolExecutor.map()?
Posted
技术标签:
【中文标题】如何将具有多个参数的函数传递给 python concurrent.futures.ProcessPoolExecutor.map()?【英文标题】:How to pass a function with more than one argument to python concurrent.futures.ProcessPoolExecutor.map()? 【发布时间】:2017-06-22 17:49:32 【问题描述】:我希望concurrent.futures.ProcessPoolExecutor.map()
调用包含 2 个或更多参数的函数。在下面的示例中,我使用了lambda
函数并将ref
定义为与numberlist
大小相等且值相同的数组。
第一个问题:有更好的方法吗?在 numberlist 的大小可以是百万到十亿个元素的情况下,因此 ref 大小必须遵循 numberlist,这种方法不必要地占用宝贵的内存,我想避免这种情况。我这样做是因为我读到了map
函数将终止其映射,直到到达最短的数组末端。
import concurrent.futures as cf
nmax = 10
numberlist = range(nmax)
ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
workers = 3
def _findmatch(listnumber, ref):
print('def _findmatch(listnumber, ref):')
x=''
listnumber=str(listnumber)
ref = str(ref)
print('listnumber = 0 and ref = 1'.format(listnumber, ref))
if ref in listnumber:
x = listnumber
print('x = 0'.format(x))
return x
a = map(lambda x, y: _findmatch(x, y), numberlist, ref)
for n in a:
print(n)
if str(ref[0]) in n:
print('match')
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
#for n in executor.map(_findmatch, numberlist):
for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref):
print(type(n))
print(n)
if str(ref[0]) in n:
print('match')
运行上面的代码,我发现map
函数能够达到我想要的结果。但是,当我将相同的条款转移到 concurrent.futures.ProcessPoolExecutor.map() 时,python3.5 失败并出现此错误:
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
obj = ForkingPickler.dumps(obj)
File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed
问题 2:为什么会出现这个错误,如何让 concurrent.futures.ProcessPoolExecutor.map() 调用具有多个参数的函数?
【问题讨论】:
【参考方案1】:首先要回答您的第二个问题,您会遇到一个异常,因为您正在使用的 lambda
函数是不可提取的。由于 Python 使用pickle
协议来序列化主进程和ProcessPoolExecutor
的工作进程之间传递的数据,所以这是一个问题。目前尚不清楚您为什么要使用lambda
。您拥有的 lambda 有两个参数,就像原始函数一样。您可以直接使用_findmatch
而不是lambda
,它应该可以工作。
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
for n in executor.map(_findmatch, numberlist, ref):
...
关于传递第二个常量参数而不创建巨大列表的第一个问题,您可以通过多种方式解决这个问题。一种方法可能是使用itertools.repeat
创建一个可迭代对象,该对象在迭代时永远重复相同的值。
但更好的方法可能是编写一个额外的函数来为您传递常量参数。 (也许这就是您尝试使用 lambda
函数的原因?)如果您使用的函数可以在模块的***命名空间中访问,它应该可以工作:
def _helper(x):
return _findmatch(x, 5)
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
for n in executor.map(_helper, numberlist):
...
【讨论】:
你说得对,我尝试了lambda
,因为当ref
是一个常量时,我最初在将带有2 个参数的函数传递给executor
时遇到问题。在将ref
转换为与numberlist
大小相同的列表后,我才意识到我忘了删除 lambda。我真正想要的是一个解决方案,其中ref
是一个常量或类似的。所以你提到的辅助函数和itertools.repeat
起作用了。谢谢。
我想邀请你回答我的follow-up question,在那里我用Executor.submit
对Executor.map
的性能进行了基准测试,发现前者明显慢得多,我想知道为什么?【参考方案2】:
(1) 无需列出。您可以使用itertools.repeat
创建一个只重复某个值的迭代器。
(2) 您需要将命名函数传递给map
,因为它将传递给子进程执行。 map
使用 pickle 协议发送东西,lambdas 不能被腌制,因此它们不能成为地图的一部分。但它完全没有必要。您的 lambda 所做的只是调用带有 2 个参数的 2 参数函数。完全删除它。
工作代码是
import concurrent.futures as cf
import itertools
nmax = 10
numberlist = range(nmax)
workers = 3
def _findmatch(listnumber, ref):
print('def _findmatch(listnumber, ref):')
x=''
listnumber=str(listnumber)
ref = str(ref)
print('listnumber = 0 and ref = 1'.format(listnumber, ref))
if ref in listnumber:
x = listnumber
print('x = 0'.format(x))
return x
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
#for n in executor.map(_findmatch, numberlist):
for n in executor.map(_findmatch, numberlist, itertools.repeat(5)):
print(type(n))
print(n)
#if str(ref[0]) in n:
# print('match')
【讨论】:
感谢您的解释和解决方案。 :) 我想邀请你回答我的follow-up question,在那里我用Executor.submit
对Executor.map
的性能进行了基准测试,发现前者明显变慢了,我想知道为什么?【参考方案3】:
关于您的第一个问题,我是否正确理解您想要传递一个参数,其值仅在您调用 map
时确定,但对于映射函数的所有实例都是常量?如果是这样,我将使用从“模板函数”派生的函数执行map
,并使用functools.partial
将第二个参数(在您的示例中为ref
)烘焙到其中:
from functools import partial
refval = 5
def _findmatch(ref, listnumber): # arguments swapped
...
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
for n in executor.map(partial(_findmatch, refval), numberlist):
...
回复。问题 2,第一部分:我还没有找到尝试腌制(序列化)然后应该并行执行的函数的确切代码段,但这听起来很自然——不仅是参数,还有该函数必须以某种方式转移给 workers,并且可能必须为这种转移进行序列化。 partial
函数可以腌制而 lambda
s 不能腌制这一事实在其他地方被提及,例如这里:https://***.com/a/19279016/6356764。
回复。问题 2,第二部分:如果您想在 ProcessPoolExecutor.map
中调用具有多个参数的函数,您可以将函数作为第一个参数传递,然后是函数的第一个参数的可迭代,然后是函数的可迭代它的第二个参数等。在你的情况下:
for n in executor.map(_findmatch, numberlist, ref):
...
【讨论】:
感谢分享。 :) 您的解决方案有效。这也是我第一次了解 partial。 我想邀请你回答我的follow-up question,在那里我用Executor.submit
对Executor.map
的性能进行了基准测试,发现前者明显变慢了,我想知道为什么?
@mkorvas 我为我的问题使用了你的解决方案***.com/questions/56492876/…以上是关于如何将具有多个参数的函数传递给 python concurrent.futures.ProcessPoolExecutor.map()?的主要内容,如果未能解决你的问题,请参考以下文章