python教程map多进程与进度条
Posted burlingame
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python教程map多进程与进度条相关的知识,希望对你有一定的参考价值。
转载:【python教程】map、多进程与进度条 - 知乎 (zhihu.com)
今天讲讲我在实习中学到的一点 python 知识,核心内容是多进程,也即我们常说的并行计算。
map
首先提个问题,给出一个列表,对列表中的每个元素都平方,代码怎么写?
最简单直观的方法自然就是 for 循环。
alist = [1,2,3,4,5,6,7,8]
def power_value(num):
return num**2
result_list = []
for num in alist:
result_list.append(power_value(num))
print(result_list)
>>> [1, 4, 9, 16, 25, 36, 49, 64]
注:当然你也可以不定义函数直接循环,这里主要是为了方便开展下文。
学过列表推导式的同学可以写出更加简洁的方案:
result_list = [power_value(x) for x in alist]
print(result_list)
>>> [1, 4, 9, 16, 25, 36, 49, 64]
但其实,除了上面两种方法外,python 还内置了一个函数 map
,专门解决这种不断往同一个函数传不同参数的问题。它的语法也很简单:
map(function, iterable, ...)
第一个参数为函数名,第二个参数和之后的参数均为序列(列表、元组、range()等等)。第一个参数 function 以参数序列中的每一个元素调用 function 函数,返回包含每次 function 函数返回值的新列表。有了这个函数,上面的代码就可以改写为:
result_list = list(map(power_value,alist))
print(result_list)
>>> [1, 4, 9, 16, 25, 36, 49, 64]
这里为什么要加一个 list()
呢?因为 map 返回的是一个迭代器,一般情况下只能循环遍历读取,所以加个 list 把它转换为列表。
上面的函数只有一个参数,那假如它有两个参数呢?我们分为两种情况,一是这两个参数是成对出现的,比如说,两个列表相加,就是列表对应的元素相加。这种情况我们也可以直接应用 map。
alist = [1,2,3,4,5,6,7,8]
blist = [2,3,4,5,6,7,8,9]
def add_value(a,b):
return a + b
result_list = list(map(add_value,alist,blist))
print(result_list)
>>> [3, 5, 7, 9, 11, 13, 15, 17]
第二种情况,这两个参数并不对应,常见情况是需要固定一个参数,比如上面的加法函数,我需要固定 b=5
,这时候,可以借助 python 提供的另外一个函数 partial
,这是 python 自带包 functools
里的函数,它的作用就是固定某些参数,从而构造出一个新函数。其语法为:
partial(func, param, ……)
这里有个容易忽略的点,如果不指定参数位置的话,那么默认从第一个参数开始固定,所以使用它的时候最好指定参数。但是被指定的参数必须在函数的最后部分,比如 func01(a,b,c)
,我们可以固定 b 和 c,可以固定 c,但是绝对不能只固定 b!否则会报错。
from functools import partial
alist = [1,2,3,4,5,6,7,8]
def add_value(a,b):
return (a + b) * a
func = partial(add_value,b = 5)
result_list = list(map(func,alist))
print(result_list)
>>> [6, 14, 24, 36, 50, 66, 84, 104]
多进程
常被和进程一起提起的是线程,不过我也没搞懂这两的具体原理。总之明白两个区别就好:
- 进程之前数据不共享,线程之间数据共享
- 进程之间不会相互影响,一个进程挂掉另外一个进程照常工作。而一个线程挂掉则所有线程都会挂掉。
我在工作中常用到的是多进程,所以主要讲讲它。
python 自带了一个多进程库 multiprocessing
,利用上面学到的 map 知识,我们可以很容易实现并行运算,有了它,上面的代码可以改写如下:
import multiprocessing
from functools import partial
alist = [1,2,3,4,5,6,7,8]
def add_value(a,b):
return (a + b) * a
func = partial(add_value,b = 5)
if __name__ == \'__main__\':
num_processes = multiprocessing.cpu_count()-2 # 使用核心数
pool = multiprocessing.Pool(processes=num_processes) # 实例化进程池
func = partial(add_value,b = 5)
print(pool.map(func,alist))
这样就实现了一个最简单的多进程,可以发现,和 map 相比就多了一个实例化进程池的过程。需要注意的是,这里开启多进程,肯定是要比单进程速度慢的,因为系统在进程间分配任务也是需要时间的,所以我们也不能无脑开多进程,大型任务才可能需要。
另外,pool.map
返回的是一个列表,如果想像 map 那样返回迭代器的话,可以使用 pool.imap
,这两个函数返回都是有序的,如果有特殊需求想返回无须结果,可以使用 imap_unordered
最重要的一点,多进程必须在 if __name__ == \'__main__\'
下写!否则会报错,当然了,你在函数里使用多进程,然后在 if __name__ == \'__main__\'
下调用这个函数也是允许的。
这是 Windows 上多进程的实现问题。在 Windows 上,子进程会自动 import 启动它的这个文件,而在 import 的时候是会执行这些语句的。如果你这么写的话就会无限递归创建子进程报错。所以必须把创建子进程的部分用那个 if 判断保护起来, import 的时候 name 不是 main,就不会递归运行了。
@cholerae
进度条
在执行大型任务的时候,我一般是输出一个中间结果就 print 一条提示语句,这样可以让我知道程序仍然在正常运行,而且也能大致知道处理到哪了。不过,如果只是上面两点需求的话,其实可以使用进度条,这样更加方便直观。最常见的进度条库是 tqdm
,pip 安装好后就可以使用了,语法也很简单。
from tqdm import tqdm,trange
for i in trange(100):
time.sleep(0.1)
for i in tqdm(range(100)):
time.sleep(0.1)
不过问题在于,多进程下如何使用进度条?自然,可以为每个进程生成一个进度条,但是我想看总的任务进度,而不是展示 5、6 根进度,怎么做?针对这个需求,我借鉴 Lei Mao,重写了 run_imap_mp
函数,用法和 map 基本一样,其他参数注释里也有说明,这里就不细说了。
def run_imap_mp(func, argument_list, num_processes=\'\', is_tqdm=True):
\'\'\'
多进程与进度条结合
param:
------
func:function
函数
argument_list:list
参数列表
num_processes:int
进程数,不填默认为总核心-3
is_tqdm:bool
是否展示进度条,默认展示
\'\'\'
result_list_tqdm = []
try:
import multiprocessing
if num_processes == \'\':
num_processes = multiprocessing.cpu_count()-3
pool = multiprocessing.Pool(processes=num_processes)
if is_tqdm:
from tqdm import tqdm
for result in tqdm(pool.imap(func=func, iterable=argument_list), total=len(argument_list)):
result_list_tqdm.append(result)
else:
for result in pool.imap(func=func, iterable=argument_list):
result_list_tqdm.append(result)
pool.close()
except:
result_list_tqdm = list(map(func,argument_list))
return result_list_tqdm
from tqdm import tqdm,trange import time import multiprocessing from functools import partial def run_imap_mp(func, argument_list, num_processes=\'\', is_tqdm=True): \'\'\' 多进程与进度条结合 param: ------ func:function 函数 argument_list:list 参数列表 num_processes:int 进程数,不填默认为总核心-3 is_tqdm:bool 是否展示进度条,默认展示 \'\'\' result_list_tqdm = [] try: import multiprocessing if num_processes == \'\': num_processes = multiprocessing.cpu_count()-3 # pool = multiprocessing.Pool(processes=num_processes) pool = multiprocessing.Pool(processes=2) if is_tqdm: from tqdm import tqdm for result in tqdm(pool.imap(func=func, iterable=argument_list), total=len(argument_list)): result_list_tqdm.append(result) else: for result in pool.imap(func=func, iterable=argument_list): result_list_tqdm.append(result) pool.close() except: result_list_tqdm = list(map(func,argument_list)) return result_list_tqdm alist = [1,2,3,4,5,6,7,8] def add_value(a,b): return (a + b) * a func = partial(add_value,b = 5) if __name__ == \'__main__\': func = partial(add_value, b=5) run_imap_mp(func,range(100000)) # num_processes = multiprocessing.cpu_count()-2 # 使用核心数 # pool = multiprocessing.Pool(processes=2) # 实例化进程池 # func = partial(add_value,b = 5) # print(pool.map(func,alist))
运行结果如下所示:
Python Multiprocessing 多进程,使用多核CPU计算 并使用tqdm显示进度条
1.背景
在python运行一些,计算复杂度比较高的函数时,服务器端单核CPU的情况比较耗时,因此需要多CPU使用多进程加快速度
2.函数要求
笔者使用的是:pathos.multiprocessing 库,进度条显示用tqdm库,安装方法:
pip install pathos
安装完成后
from pathos.multiprocessing import ProcessingPool as Pool
from tqdm import tqdm
这边使用pathos的原因是因为,multiprocessing 库中的Pool 函数只支持单参数输入,例如 f(x) = x**2,而不能处理 f (x,y) = x+y 这类的函数
更不用说一些需要参数的函数 例如:F(x , alpha=0.5, gamma = 0.1) 这样。
3.代码
定义一个 函数 F [ X ] ,其中,输入X是可以在第一个维度上迭代的array, 大小:[ num_X, len ] , 在第一维度 num_X 上进行迭代。
def F(X,lamda=10,weight=0.05): res= res.update(F_1(X,lamda=lamda,weight=weight)) res.update(F_2(X,lamda=lamda,weight=weight)) return res
x 是 F 的输出,是一个dict (字典格式)
这里的两个函数超参数 lamda 和 weight 虽然每次调用的时候值是一样的,但是还是需要放一个数组每次用于迭代。
zip_lamda = [lamda for i in range(len(X)) ] zip_weight = [weight for i in range(len(X)) ] with tqdm(total=len(cold_sequences)) as t: for i, x in enumerate(pool.imap(F,X,zip_lamda,zip_weight)): X[i,:] = [x[key] for key in x.keys()] Y[i,] = 0 t.update() pool.close() pool.join()
4.结果
mutiprocess 加速前
mutiprocess 加速后
以上是关于python教程map多进程与进度条的主要内容,如果未能解决你的问题,请参考以下文章
Python Multiprocessing 多进程,使用多核CPU计算 并使用tqdm显示进度条
pqdm 是 tqdm 和 concurrent.futures 的 wrapper | 一个小而美的 Python 并行计算库 | 实现多进程显示进度条的优雅方案
pqdm 是 tqdm 和 concurrent.futures 的 wrapper | 一个小而美的 Python 并行计算库 | 实现多进程显示进度条的优雅方案