python教程map多进程与进度条

Posted burlingame

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python教程map多进程与进度条相关的知识,希望对你有一定的参考价值。

转载:【python教程】map、多进程与进度条 - 知乎 (zhihu.com)

今天讲讲我在实习中学到的一点 python 知识,核心内容是多进程,也即我们常说的并行计算。

map

首先提个问题,给出一个列表,对列表中的每个元素都平方,代码怎么写?

最简单直观的方法自然就是 for 循环。

alist = [1,2,3,4,5,6,7,8]

def power_value(num):
    return num**2

result_list = []
for num in alist:
    result_list.append(power_value(num))

print(result_list)

>>> [1, 4, 9, 16, 25, 36, 49, 64]
注:当然你也可以不定义函数直接循环,这里主要是为了方便开展下文。

学过列表推导式的同学可以写出更加简洁的方案:

result_list = [power_value(x) for x in alist]

print(result_list)

>>> [1, 4, 9, 16, 25, 36, 49, 64]

但其实,除了上面两种方法外,python 还内置了一个函数 map,专门解决这种不断往同一个函数传不同参数的问题。它的语法也很简单:

map(function, iterable, ...)

第一个参数为函数名,第二个参数和之后的参数均为序列(列表、元组、range()等等)。第一个参数 function 以参数序列中的每一个元素调用 function 函数,返回包含每次 function 函数返回值的新列表。有了这个函数,上面的代码就可以改写为:

result_list = list(map(power_value,alist))

print(result_list)

>>> [1, 4, 9, 16, 25, 36, 49, 64]

这里为什么要加一个 list() 呢?因为 map 返回的是一个迭代器,一般情况下只能循环遍历读取,所以加个 list 把它转换为列表。

上面的函数只有一个参数,那假如它有两个参数呢?我们分为两种情况,一是这两个参数是成对出现的,比如说,两个列表相加,就是列表对应的元素相加。这种情况我们也可以直接应用 map。

alist = [1,2,3,4,5,6,7,8]
blist = [2,3,4,5,6,7,8,9]

def add_value(a,b):
    return a + b

result_list = list(map(add_value,alist,blist))

print(result_list)

>>> [3, 5, 7, 9, 11, 13, 15, 17]

第二种情况,这两个参数并不对应,常见情况是需要固定一个参数,比如上面的加法函数,我需要固定 b=5,这时候,可以借助 python 提供的另外一个函数 partial,这是 python 自带包 functools 里的函数,它的作用就是固定某些参数,从而构造出一个新函数。其语法为:

partial(func, param, ……)

这里有个容易忽略的点,如果不指定参数位置的话,那么默认从第一个参数开始固定,所以使用它的时候最好指定参数。但是被指定的参数必须在函数的最后部分,比如 func01(a,b,c),我们可以固定 b 和 c,可以固定 c,但是绝对不能只固定 b!否则会报错。

from functools import partial

alist = [1,2,3,4,5,6,7,8]

def add_value(a,b):
    return (a + b) * a

func = partial(add_value,b = 5)
result_list = list(map(func,alist))

print(result_list)

>>> [6, 14, 24, 36, 50, 66, 84, 104]

多进程

常被和进程一起提起的是线程,不过我也没搞懂这两的具体原理。总之明白两个区别就好:

  1. 进程之前数据不共享,线程之间数据共享
  2. 进程之间不会相互影响,一个进程挂掉另外一个进程照常工作。而一个线程挂掉则所有线程都会挂掉。

我在工作中常用到的是多进程,所以主要讲讲它。

python 自带了一个多进程库 multiprocessing,利用上面学到的 map 知识,我们可以很容易实现并行运算,有了它,上面的代码可以改写如下:

import multiprocessing
from functools import partial

alist = [1,2,3,4,5,6,7,8]

def add_value(a,b):
    return (a + b) * a

func = partial(add_value,b = 5)

if __name__ == \'__main__\':
    num_processes = multiprocessing.cpu_count()-2 # 使用核心数
    pool = multiprocessing.Pool(processes=num_processes) # 实例化进程池
    func = partial(add_value,b = 5)
    print(pool.map(func,alist))

这样就实现了一个最简单的多进程,可以发现,和 map 相比就多了一个实例化进程池的过程。需要注意的是,这里开启多进程,肯定是要比单进程速度慢的,因为系统在进程间分配任务也是需要时间的,所以我们也不能无脑开多进程,大型任务才可能需要。

另外,pool.map 返回的是一个列表,如果想像 map 那样返回迭代器的话,可以使用 pool.imap,这两个函数返回都是有序的,如果有特殊需求想返回无须结果,可以使用 imap_unordered

最重要的一点,多进程必须在 if __name__ == \'__main__\' 下写!否则会报错,当然了,你在函数里使用多进程,然后在 if __name__ == \'__main__\' 下调用这个函数也是允许的。

这是 Windows 上多进程的实现问题。在 Windows 上,子进程会自动 import 启动它的这个文件,而在 import 的时候是会执行这些语句的。如果你这么写的话就会无限递归创建子进程报错。所以必须把创建子进程的部分用那个 if 判断保护起来, import 的时候 name 不是 main,就不会递归运行了。
@cholerae

进度条

在执行大型任务的时候,我一般是输出一个中间结果就 print 一条提示语句,这样可以让我知道程序仍然在正常运行,而且也能大致知道处理到哪了。不过,如果只是上面两点需求的话,其实可以使用进度条,这样更加方便直观。最常见的进度条库是 tqdm,pip 安装好后就可以使用了,语法也很简单。

from tqdm import tqdm,trange 
for i in trange(100):
    time.sleep(0.1)

for i in tqdm(range(100)):
    time.sleep(0.1)

不过问题在于,多进程下如何使用进度条?自然,可以为每个进程生成一个进度条,但是我想看总的任务进度,而不是展示 5、6 根进度,怎么做?针对这个需求,我借鉴 Lei Mao,重写了 run_imap_mp 函数,用法和 map 基本一样,其他参数注释里也有说明,这里就不细说了。

def run_imap_mp(func, argument_list, num_processes=\'\', is_tqdm=True):
    \'\'\'
    多进程与进度条结合

    param:
    ------
    func:function
        函数
    argument_list:list
        参数列表
    num_processes:int
        进程数,不填默认为总核心-3
    is_tqdm:bool
        是否展示进度条,默认展示
    \'\'\' 
    result_list_tqdm = []
    try:
        import multiprocessing
        if num_processes == \'\':
            num_processes = multiprocessing.cpu_count()-3
        pool = multiprocessing.Pool(processes=num_processes)
        if is_tqdm:
            from tqdm import tqdm
            for result in tqdm(pool.imap(func=func, iterable=argument_list), total=len(argument_list)):
                result_list_tqdm.append(result)
        else:
            for result in pool.imap(func=func, iterable=argument_list):
                result_list_tqdm.append(result)
        pool.close()
    except:
        result_list_tqdm = list(map(func,argument_list))   
    return result_list_tqdm
from tqdm import tqdm,trange
import time
import multiprocessing
from functools import partial

def run_imap_mp(func, argument_list, num_processes=\'\', is_tqdm=True):
    \'\'\'
    多进程与进度条结合

    param:
    ------
    func:function
        函数
    argument_list:list
        参数列表
    num_processes:int
        进程数,不填默认为总核心-3
    is_tqdm:bool
        是否展示进度条,默认展示
    \'\'\'
    result_list_tqdm = []
    try:
        import multiprocessing
        if num_processes == \'\':
            num_processes = multiprocessing.cpu_count()-3
        # pool = multiprocessing.Pool(processes=num_processes)
        pool = multiprocessing.Pool(processes=2)
        if is_tqdm:
            from tqdm import tqdm
            for result in tqdm(pool.imap(func=func, iterable=argument_list), total=len(argument_list)):
                result_list_tqdm.append(result)
        else:
            for result in pool.imap(func=func, iterable=argument_list):
                result_list_tqdm.append(result)
        pool.close()
    except:
        result_list_tqdm = list(map(func,argument_list))
    return result_list_tqdm

alist = [1,2,3,4,5,6,7,8]

def add_value(a,b):
    return (a + b) * a

func = partial(add_value,b = 5)

if __name__ == \'__main__\':
    func = partial(add_value, b=5)
    run_imap_mp(func,range(100000))
    # num_processes = multiprocessing.cpu_count()-2 # 使用核心数
    # pool = multiprocessing.Pool(processes=2) # 实例化进程池
    # func = partial(add_value,b = 5)
    # print(pool.map(func,alist))

运行结果如下所示:

 

Python Multiprocessing 多进程,使用多核CPU计算 并使用tqdm显示进度条

1.背景

    在python运行一些,计算复杂度比较高的函数时,服务器端单核CPU的情况比较耗时,因此需要多CPU使用多进程加快速度

2.函数要求

  笔者使用的是:pathos.multiprocessing 库,进度条显示用tqdm库,安装方法:

pip install pathos

  安装完成后

from pathos.multiprocessing import ProcessingPool as Pool
from tqdm import tqdm
 这边使用pathos的原因是因为,multiprocessing 库中的Pool 函数只支持单参数输入,例如 f(x) = x**2,而不能处理 f (x,y) = x+y 这类的函数
 更不用说一些需要参数的函数 例如:F(x , alpha=0.5, gamma = 0.1) 这样。

3.代码

  定义一个 函数 F [ X ] ,其中,输入X是可以在第一个维度上迭代的array, 大小:[ num_X, len ] , 在第一维度 num_X 上进行迭代。

def F(X,lamda=10,weight=0.05):

    res=

    res.update(F_1(X,lamda=lamda,weight=weight))
    res.update(F_2(X,lamda=lamda,weight=weight))
    return res

x 是 F 的输出,是一个dict (字典格式)

这里的两个函数超参数 lamda 和 weight 虽然每次调用的时候值是一样的,但是还是需要放一个数组每次用于迭代。

zip_lamda = [lamda for i in range(len(X)) ]
zip_weight = [weight for i in range(len(X)) ]

with tqdm(total=len(cold_sequences)) as t:
        for i, x in enumerate(pool.imap(F,X,zip_lamda,zip_weight)):
            X[i,:] = [x[key] for key in x.keys()]
            Y[i,] = 0
            t.update()

    pool.close()
    pool.join()
 

4.结果

mutiprocess 加速前

技术图片

 

mutiprocess 加速后

技术图片

 

以上是关于python教程map多进程与进度条的主要内容,如果未能解决你的问题,请参考以下文章

Python Multiprocessing 多进程,使用多核CPU计算 并使用tqdm显示进度条

python之路-----多线程与多进度

pqdm 是 tqdm 和 concurrent.futures 的 wrapper | 一个小而美的 Python 并行计算库 | 实现多进程显示进度条的优雅方案

pqdm 是 tqdm 和 concurrent.futures 的 wrapper | 一个小而美的 Python 并行计算库 | 实现多进程显示进度条的优雅方案

python进程相关 - 多线程threading库

多线程与多进程的比较