具有不同功能的多进程池
Posted
技术标签:
【中文标题】具有不同功能的多进程池【英文标题】:Mulitprocess Pools with different functions 【发布时间】:2011-10-22 00:38:25 【问题描述】:多进程工作池的大多数示例在不同进程中执行单个函数,例如
def foo(args):
pass
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=30)
res=pool.map_async(foo,args)
有没有办法在池中处理两个不同且独立的功能?这样你就可以分配 f.e. foo() 有 15 个进程,bar() 有 15 个进程,还是一个池绑定到单个函数?或者你必须手动为不同的功能创建不同的进程
p = Process(target=foo, args=(whatever,))
q = Process(target=bar, args=(whatever,))
q.start()
p.start()
忘记工作池?
【问题讨论】:
【参考方案1】:要传递不同的函数,你可以简单地多次调用map_async
。
这里有一个例子来说明,
from multiprocessing import Pool
from time import sleep
def square(x):
return x * x
def cube(y):
return y * y * y
pool = Pool(processes=20)
result_squares = pool.map_async(f, range(10))
result_cubes = pool.map_async(g, range(10))
结果将是:
>>> print result_squares.get(timeout=1)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print result_cubes.get(timeout=1)
[0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
【讨论】:
它们是并行执行还是“连续”执行?map_async
立即返回。只要池中有足够的空闲进程,新任务就会运行而无需等待。在上面的示例中,它们将并行运行。 @mad_scientist
谢谢!但是我猜没有办法分配特定数量的工人/进程?
multiprocessing Pool API 不提供在同一池中分配特定数量的工人的机制。如果您真的希望每个任务有特定数量的工人,请创建不同的池。虽然建议只有一个池。我认为 Pool 应该透明地为您管理而不用担心它是有道理的。
感谢您的回答,您是否积极添加map_async()
一个接一个将并行运行。我实际上已经尝试过了,正如@Sam 的回答所表明的那样,这些似乎是按顺序运行的。【参考方案2】:
它们将不会并行运行。 见以下代码:
def updater1(q,i):
print "UPDATER 1:", i
return
def updater2(q,i):
print "UPDATER2:", i
return
if __name__=='__main__':
a = range(10)
b=["abc","def","ghi","jkl","mno","pqr","vas","dqfq","grea","qfwqa","qwfsa","qdqs"]
pool = multiprocessing.Pool()
func1 = partial(updater1,q)
func2 = partial(updater2,q)
pool.map_async(func1, a)
pool.map_async(func2, b)
pool.close()
pool.join()
以上代码产生以下打印输出:
UPDATER 1: 1
UPDATER 1: 0
UPDATER 1: 2
UPDATER 1: 3
UPDATER 1: 4
UPDATER 1: 5
UPDATER 1: 6
UPDATER 1: 7
UPDATER 1: 8
UPDATER 1: 9
UPDATER2: abc
UPDATER2: def
UPDATER2: ghi
UPDATER2: jkl
UPDATER2: mno
UPDATER2: pqr
UPDATER2: vas
UPDATER2: dqfq
UPDATER2: grea
UPDATER2: qfwqa
UPDATER2: qwfsa
UPDATER2: qdqs
【讨论】:
【参考方案3】:您可以使用 map 或一些 lambda 函数(编辑:实际上您不能使用 lambda 函数)。您可以使用简单的地图功能:
def smap(f, *args):
return f(*args)
pool = multiprocessing.Pool(processes=30)
res=pool.map(smap, function_list, args_list1, args_list2,...)
普通的map函数将iterables作为输入,很不方便。
【讨论】:
这应该被接受为正确答案,因为接受的答案以准并行模式运行(使用糟糕的计划者)。【参考方案4】:这是@Rayamon 分享的想法的一个工作示例:
import functools
from multiprocessing import Pool
def a(param1, param2, param3):
return param1 + param2 + param3
def b(param1, param2):
return param1 + param2
def smap(f):
return f()
func1 = functools.partial(a, 1, 2, 3)
func2 = functools.partial(b, 1, 2)
pool = Pool(processes=2)
res = pool.map(smap, [func1, func2])
pool.close()
pool.join()
print(res)
【讨论】:
我如何将值列表作为参数传递并在线程中单独工作。如果是单个函数,它可以正常工作,但在多个函数的情况下就不行了..【参考方案5】:为了进一步解释上面的其他答案,这里有一个例子:
-
使用池(平方函数)并行运行具有多个输入的单个函数 有趣的边注意“5 981 25”行上的错位操作
使用不同的输入(args 和 kwargs)运行多个函数并使用池(pf1、pf2、pf3 函数)收集它们的结果
import datetime
import multiprocessing
import time
import random
from multiprocessing import Pool
def square(x):
# calculate the square of the value of x
print(x, x*x)
return x*x
def pf1(*args, **kwargs):
sleep_time = random.randint(3, 6)
print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf1", args, sleep_time, datetime.datetime.now()))
print("Keyword Args from pf1: %s" % kwargs)
time.sleep(sleep_time)
print(multiprocessing.current_process().name, "\tpf1 done at %s\n" % datetime.datetime.now())
return (sum(*args), kwargs)
def pf2(*args):
sleep_time = random.randint(7, 10)
print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf2", args, sleep_time, datetime.datetime.now()))
time.sleep(sleep_time)
print(multiprocessing.current_process().name, "\tpf2 done at %s\n" % datetime.datetime.now())
return sum(*args)
def pf3(*args):
sleep_time = random.randint(0, 3)
print("Process : %s\tFunction : %s\tArgs: %s\tsleeping for %d\tTime : %s\n" % (multiprocessing.current_process().name, "pf3", args, sleep_time, datetime.datetime.now()))
time.sleep(sleep_time)
print(multiprocessing.current_process().name, "\tpf3 done at %s\n" % datetime.datetime.now())
return sum(*args)
def smap(f, *arg):
if len(arg) == 2:
args, kwargs = arg
return f(list(args), **kwargs)
elif len(arg) == 1:
args = arg
return f(*args)
if __name__ == '__main__':
# Define the dataset
dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
# Output the dataset
print ('Dataset: ' + str(dataset))
# Run this with a pool of 5 agents having a chunksize of 3 until finished
agents = 5
chunksize = 3
with Pool(processes=agents) as pool:
result = pool.map(square, dataset)
print("Result of Squares : %s\n\n" % result)
with Pool(processes=3) as pool:
result = pool.starmap(smap, [(pf1, [1,2,3], 'a':123, 'b':456), (pf2, [11,22,33]), (pf3, [111,222,333])])
# Output the result
print ('Result: %s ' % result)
Output:
*******
Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
1 1
2 4
3 9
4 16
6 36
7 49
8 64
59 81
25
10 100
11 121
12 144
13 169
14 196
Result of Squares : [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]
Process : ForkPoolWorker-6 Function : pf1 Args: ([1, 2, 3],) sleeping for 3 Time : 2020-07-20 00:51:56.477299
Keyword Args from pf1: 'a': 123, 'b': 456
Process : ForkPoolWorker-7 Function : pf2 Args: ([11, 22, 33],) sleeping for 8 Time : 2020-07-20 00:51:56.477371
Process : ForkPoolWorker-8 Function : pf3 Args: ([111, 222, 333],) sleeping for 1 Time : 2020-07-20 00:51:56.477918
ForkPoolWorker-8 pf3 done at 2020-07-20 00:51:57.478808
ForkPoolWorker-6 pf1 done at 2020-07-20 00:51:59.478877
ForkPoolWorker-7 pf2 done at 2020-07-20 00:52:04.478016
Result: [(6, 'a': 123, 'b': 456), 66, 666]
Process finished with exit code 0
【讨论】:
【参考方案6】:多功能
以下示例显示了如何在池中运行多个函数。
from multiprocessing import Pool
import functools
def inc(x):
return x + 1
def dec(x):
return x - 1
def add(x, y):
return x + y
def smap(f):
return f()
def main():
f_inc = functools.partial(inc, 4)
f_dec = functools.partial(dec, 2)
f_add = functools.partial(add, 3, 4)
with Pool() as pool:
res = pool.map(smap, [f_inc, f_dec, f_add])
print(res)
if __name__ == '__main__':
main()
我们有三个函数,它们在一个池中独立运行。我们使用 functools.partial 在执行之前准备函数及其参数。
来源:https://zetcode.com/python/multiprocessing/
【讨论】:
以上是关于具有不同功能的多进程池的主要内容,如果未能解决你的问题,请参考以下文章