Python进程线程和协程实战指归
Posted 天元浪子
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python进程线程和协程实战指归相关的知识,希望对你有一定的参考价值。
文章目录
1. 前言
前些日子写过几篇关于线程和进程的文章,概要介绍了Python内置的线程模块(threading)和进程模块(multiprocessing)的使用方法,侧重点是线程间同步和进程间同步。随后,陆续收到了不少读者的私信,咨询进程、线程和协程的使用方法,进程、线程和协程分别适用于何种应用场景,以及混合使用进程、线程和协程的技巧。归纳起来,核心的问题大致有以下几个:
- 使用线程是为了并行还是加速?
- 为什么我使用多线程之后,处理速度并没有预期的快,甚至更慢了?
- 我应该选择多进程处理还是多线程处理?
- 协程和线程有什么不同?
- 什么情况下使用协程?
在进程、线程和协程的使用上,初学者之所以感到困惑,最主要的原因是对任务的理解不到位。任务是由一个进程、或者线程、或者协程独立完成的、相对独立的一系列工作组合。通常,我们会把任务写成一个函数。任务有3种类型:
- 计算密集型任务:任务包含大量计算,CPU占用率高
- IO密集型任务:任务包含频繁的、持续的网络IO和磁盘IO
- 混合型任务:既有计算也有IO
也有观点认为还有一种数据密集型任务,但我认为数据密集型任务一般出现在分布式系统或异构系统上,必定伴随着计算密集和IO密集,因此,仍然可以归类到混合型任务。
下面,我们就以几个实例来讲解演示进程、线程和协程的适用场景、使用方法,以及如何优化我们的代码。
2. 线程
2.1 线程的最大意义在于并行
通常,代码是单线程顺序执行的,这个线程就是主线程。仅有主线程的话,在同一时刻就只能做一件事情;如果有多件事情要做,那也只能做完一件再去做另一件。这有点类似于过去的说书艺人,情节人物复杂时,只能“花开两朵,各表一枝”。下面这个题目,就是一个需要同时做两件事情的例子。
请写一段代码,提示用户从键盘输入任意字符,然后等待用户输入。如果用户在10秒钟完成输入(按回车键),则显示输入内容并结束程序;否则,不再等待用户输入,而是直接提示超时并结束程序。
我们知道,input()函数用于从键盘接收输入,time.sleep()函数可以令程序停止运行指定的时长。不过,在等待键盘输入的时候,sleep()函数就无法计时,而在休眠的时候,input()函数就无法接收键盘输入。不借助于线程,我们无法同时做这两件事情。如果使用线程技术的话,我们可以在主线程中接收键盘输入,在子线程中启动sleep()函数,一旦休眠结束,子线程就杀掉主线程,结束程序。
import os, time
import threading
def monitor():
time.sleep(10)
print('\\n超时退出!')
os._exit(0)
m = threading.Thread(target=monitor)
m.setDaemon(True)
m.start()
s = input('请输入>>>')
print('接收到键盘输入:%s'%s)
print('程序正常结束。')
2.2 使用线程处理IO密集型任务
假如从100个网站抓取数据,使用单线程的话,就需要逐一请求这100个站点并处理应答结果,所花费时间就是每个站点花费时间的总和。如果使用多个线程来实现的话,结果会怎样呢?
import time
import requests
import threading
urls = ['https://www.baidu.com', 'https://cn.bing.com']
def get_html(n):
for i in range(n):
url = urls[i%2]
resp = requests.get(url)
#print(resp.ok, url)
t0 = time.time()
get_html(100) # 请求100次
t1 = time.time()
print('1个线程请求100次,耗时%0.3f秒钟'%(t1-t0))
for n_thread in (2,5,10,20,50):
t0 = time.time()
ths = list()
for i in range(n_thread):
ths.append(threading.Thread(target=get_html, args=(100//n_thread,)))
ths[-1].setDaemon(True)
ths[-1].start()
for i in range(n_thread):
ths[i].join()
t1 = time.time()
print('%d个线程请求100次,耗时%0.3f秒钟'%(n_thread, t1-t0))
上面的代码用百度和必应两个网站来模拟100个站点,运行结果如下所示。单线程处理大约需要30秒钟。分别使用2、5、10个线程来处理的话,所耗时间与线程数量基本上保持反比关系。当线程数量继续增加20个时,速度不再有显著提升。若将线程数量增至50个,时间消耗反倒略有增加。
1个线程请求100次,耗时30.089秒钟
2个线程请求100次,耗时15.087秒钟
5个线程请求100次,耗时7.803秒钟
10个线程请求100次,耗时4.112秒钟
20个线程请求100次,耗时3.160秒钟
50个线程请求100次,耗时3.564秒钟
这个结果表明,对于IO密集型(本例仅测试网络IO,没有磁盘IO)的任务,适量的线程可以在一定程度上提高处理速度。随着线程数量的增加,速度的提升不再明显。
2.3 使用线程处理计算密集型任务
对于曝光不足或明暗变化剧烈的照片可以通过算法来修正。下图左是一张落日图,因为太阳光线较强导致暗区细节无法辨识,通过低端增强算法可以还原为下图右的样子。
低端增强算法(也有人叫做伽马矫正)其实很简单:对于
[
0
,
255
]
[0,255]
[0,255]区间内的灰度值
v
0
v_0
v0,指定矫正系数
γ
\\gamma
γ,使用下面的公式,即可得到矫正后的灰度值
v
1
v_1
v1,其中
γ
\\gamma
γ一般选择2或者3,上图右就是
γ
\\gamma
γ为3的效果。
v
1
=
255
×
(
v
0
255
)
1
γ
v_1 = 255\\times(\\fracv_0255)^\\frac1\\gamma
v1=255×(255v0)γ1
下面的代码,对于一张分辨率为4088x2752的照片实施低端增强算法,这是一项计算密集型的任务。代码中分别使用了广播和矢量计算、单线程逐像素计算、多线程逐像素计算等三种方法,以验证多线程对于计算密集型任务是否有提速效果。
import time
import cv2
import numpy as np
import threading
def gamma_adjust_np(im, gamma, out_file):
"""伽马增强函数:使用广播和矢量化计算"""
out = (np.power(im.astype(np.float32)/255, 1/gamma)*255).astype(np.uint8)
cv2.imwrite(out_file, out)
def gamma_adjust_py(im, gamma, out_file):
"""伽马增强函数:使用循环逐像素计算"""
rows, cols = im.shape
out = im.astype(np.float32)
for i in range(rows):
for j in range(cols):
out[i,j] = pow(out[i,j]/255, 1/3)*255
cv2.imwrite(out_file, out.astype(np.uint8))
im = cv2.imread('river.jpg', cv2.IMREAD_GRAYSCALE)
rows, cols = im.shape
print('照片分辨率为%dx%d'%(cols, rows))
t0 = time.time()
gamma_adjust_np(im, 3, 'river_3.jpg')
t1 = time.time()
print('借助NumPy广播特性,耗时%0.3f秒钟'%(t1-t0))
t0 = time.time()
im_3 = gamma_adjust_py(im, 3, 'river_3_cycle.jpg')
t1 = time.time()
print('单线程逐像素处理,耗时%0.3f秒钟'%(t1-t0))
t0 = time.time()
th_1 = threading.Thread(target=gamma_adjust_py, args=(im[:rows//2], 3, 'river_3_1.jpg'))
th_1.setDaemon(True)
th_1.start()
th_2 = threading.Thread(target=gamma_adjust_py, args=(im[rows//2:], 3, 'river_3_2.jpg'))
th_2.setDaemon(True)
th_2.start()
th_1.join()
th_2.join()
t1 = time.time()
print('启用两个线程逐像素处理,耗时%0.3f秒钟'%(t1-t0))
运行结果如下:
照片分辨率为4088x2752
借助NumPy广播特性,耗时0.381秒钟
单线程逐像素处理,耗时34.228秒钟
启用两个线程逐像素处理,耗时36.087秒钟
结果显示,对一张千万级像素的照片做低端增强,借助于NumPy的广播和矢量化计算,耗时0.38秒钟;单线程逐像素处理的话,耗时相当于NumPy的100倍;启用多线程的话,速度不仅没有加快,反倒是比单线程更慢。这说明,对于计算密集型的任务来说,多线程并不能提高处理速度,相反,因为要创建和管理线程,处理速度会更慢一些。
2.4 线程池
尽管多线程可以并行处理多个任务,但开启线程不仅花费时间,也需要占用系统资源。因此,线程数量不是越多越快,而是要保持在合理的水平上。线程池可以让我们用固定数量的线程完成比线程数量多得多的任务。下面的代码演示了使用 Python 的标准模块创建线程池,计算多个数值的平方。
>>> from concurrent.futures import ThreadPoolExecutor
>>> def pow2(x):
return x*x
>>> with ThreadPoolExecutor(max_workers=4) as pool: # 4个线程的线程池
result = pool.map(pow2, range(10)) # 使用4个线程分别计算0~9的平方
>>> list(result) # result是一个生成器,转成列表才可以直观地看到计算结果
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
如果每个线程的任务各不相同,使用不同的线程函数,任务结束后的结果处理也不一样,同样可以使用这个线程池。下面的代码对多个数值中的奇数做平方运算,偶数做立方运算,线程任务结束后,打印各自的计算结果。
>>> from concurrent.futures import ThreadPoolExecutor
>>> def pow2(x):
return x*x
>>> def pow3(x):
return x*x*x
>>> def save_result(task): # 保存线程计算结果
global result
result.append(task.result())
>>> result = list()
>>> with ThreadPoolExecutor(max_workers=3) as pool:
for i in range(10):
if i%2: # 奇数做平方运算
task = pool.submit(pow2, i)
else: # 偶数做立方运算
task = pool.submit(pow3, i)
task.add_done_callback(save_result) # 为每个线程指定结束后的回调函数
>>> result
[0, 1, 8, 9, 64, 25, 216, 49, 512, 81]
3. 进程
3.1 使用进程处理计算密集型任务
和线程相比,进程的最大优势是可以充分例用计算资源——这一点不难理解,因为不同的进程可以运行在不同CPU的不同的核上。假如一台计算机的CPU共有16核,则可以启动16个或更多个进程来并行处理任务。对于上面的例子,我们改用进程来处理,效果会怎样呢?
import time
import cv2
import numpy as np
import multiprocessing as mp
def gamma_adjust_py(im, gamma, out_file):
"""伽马增强函数:使用循环逐像素计算"""
rows, cols = im.shape
out = im.astype(np.float32)
for i in range(rows):
for j in range(cols):
out[i,j] = pow(out[i,j]/255, 1/3)*255
cv2.imwrite(out_file, out.astype(np.uint8))
if __name__ == '__main__':
mp.freeze_support()
im_fn = 'river.jpg'
im = cv2.imread(im_fn, cv2.IMREAD_GRAYSCALE)
rows, cols = im.shape
print('照片分辨率为%dx%d'%(cols, rows))
t0 = time.time()
pro_1 = mp.Process(target=gamma_adjust_py, args=(im[:rows//2], 3, 'river_3_1.jpg'))
pro_1.daemon = True
pro_1.start()
pro_2 = mp.Process(target=gamma_adjust_py, args=(im[rows//2:], 3, 'river_3_2.jpg'))
pro_2.daemon = True
pro_2.start()
pro_1.join()
pro_2.join()
t1 = time.time()
print('启用两个进程逐像素处理,耗时%0.3f秒钟'%(t1-t0))
运行结果如下:
照片分辨率为4088x2752
启用两个进程逐像素处理,耗时17.786秒钟
使用单个线程或两个线程的时候,耗时大约30+秒,改用两个进程后,耗时17.786秒,差不多快了一倍。如果使用4个进程(前提是运行代码的计算机至少有4个CPU核)的话,速度还能提高一倍,有兴趣的朋友可以试一下。这个测试表明,对于计算密集型的任务,使用多进程并行处理是有效的提速手段。通常,进程数量选择CPU核数的整倍数。
3.2 进程间通信示例
多进程并行弥补了多线程技术的不足,我们可以在每一颗 CPU 上,或多核 CPU 的每一个核上启动一个进程。如果有必要,还可以在每个进程内再创建适量的线程,最大限度地使用计算资源来解决问题。不过,进程技术也有很大的局限性,因为进程不在同一块内存区域内,所以和线程相比,进程间的资源共享、通信、同步等都要麻烦得多,受到的限制也更多。
我们知道,线程间通信可以使用队列、互斥锁、信号量、事件和条件等多种同步方式,同样的,这些手段也可以应用在进程间。此外,multiprocessing 模块还提供了管道和共享内存等进程间通信的手段。下面仅演示一个进程间使用队列通信,更多的通信方式请参考由人民邮电出版社出版的拙著《Python高手修炼之道》。
这段代码演示了典型的生产者—消费者模式。进程 A 负责随机地往地上“撒钱”(写队列),进程 B 负责从地上“捡钱”(读队列)。
import os, time, random
import multiprocessing as mp
def sub_process_A(q):
"""A进程函数:生成数据"""
while True:
time.sleep(5*random.random()) # 在0-5秒之间随机延时
q.put(random.randint(10,100)) # 随机生成[10,100]之间的整数
def sub_process_B(q):
"""B进程函数:使用数据"""
words = ['哈哈,', '天哪!', 'My God!', '咦,天上掉馅饼了?']
while True:
print('%s捡到了%d块钱!'%(words[random.randint(0,3)], q.get()))
if __name__ == '__main__':
print('主进程(%s)开始,按回车键结束本程序'%os.getpid())
q = mp.Queue(10)
p_a = mp.Process(target=sub_process_A, args=(q,))
p_a.daemon = True
p_a.start()
p_b = mp.Process(target=sub_process_B, args=(q,))
p_b.daemon = True
p_b.start()
input()
3.3 进程池
使用多进程并行处理任务时,处理效率和进程数量并不总是成正比。当进程数量超过一定限度后,完成任务所需时间反而会延长。进程池提供了一个保持合理进程数量的方案,但合理进程数量需要根据硬件状况及运行状况来确定,通常设置为 CPU 的核数。
multiprocessing.Pool(n) 可创建 n 个进程的进程池供用户调用。如果进程池任务不满,则新的进程请求会被立即执行;如果进程池任务已满,则新的请求将等待至有可用进程时才被执行。向进程池提交任务有以下两种方式。
- apply_async(func[, args[, kwds[, callback]]]) :非阻塞式提交。即使进程池已满,也会接
受新的任务,不会阻塞主进程。新任务将处于等待状态。 - apply(func[, args[, kwds]]) :阻塞式提交。若进程池已满,则主进程阻塞,直至有空闲
进程可以使用。
下面的代码演示了进程池的典型用法。读者可自行尝试阻塞式提交和非阻塞式提交两种方法的差异。
import time
import multiprocessing as mp
def power(x, a=2):
"""进程函数:幂函数"""
time.sleep(1)
print('%d的%d次方等于%d'%(x, a, pow(x, a)))
def demo():
mpp = mp.Pool(processes=4)
for item in [2,3,4,5,6,7,8,9]:
mpp.apply_async(power, (item, )) # 非阻塞提交新任务
#mpp.apply(power, (item, )) # 阻塞提交新任务
mpp.close() # 关闭进程池,意味着不再接受新的任务
print('主进程走到这里,正在等待子进程结束')
mpp.join() # 等待所有子进程结束
print('程序结束')
if __name__ == '__main__':
demo()
4. 协程
4.1 协程和线程的区别
如前文所述,线程常用于多任务并行。对于可以切分的IO密集型任务,将切分的每一小块任务分配给一个线程,可以显著提高处理速度。而协程,无论有多少个,都被限定在一个线程内执行,因此,协程又被称为微线程。
从宏观上看,线程任务和协程任务都是并行的。从微观上看,线程任务是分时切片轮流执行的,这种切换是系统自动完成的,无需程序员干预;而协程则是根据任务特点,在任务阻塞时将控制权交给其他协程,这个权力交接的时机和位置,由程序员指定。由此可以看出,参与协程管理的每一个任务,必须存在阻塞的可能,且阻塞条件会被其它任务破坏,从而得以在阻塞解除后继续执行。
尽管协程难以驾驭,但是由于是在一个线程内运行,免除了线程或进程的切换开销,因而协程的运行效率高,在特定场合下仍然被广泛使用。
4.2 协程演进史
Py2时代,Python并不支持协程,仅可通过yield实现部分的协程功能。另外,还可以通过gevent等第三方库实现协程。gevent最好玩的,莫过于monkey_patch(猴子补丁),曾经有一段时间,我特别喜欢使用它。
从Py3.4开始,Python内置asyncio标准库,正式原生支持协程。asyncio的异步操作,需要在协程中通过yield from完成,协程函数则需要使用@asyncio.coroutine装饰器。
不理解生成器的同学,很难
以上是关于Python进程线程和协程实战指归的主要内容,如果未能解决你的问题,请参考以下文章