Python的多任务编程
Posted 东凌阁
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python的多任务编程相关的知识,希望对你有一定的参考价值。
前言
Python程序代码都是按自上而下的顺序加载并执行的,但实际需要代码处理的任务并不都是需要按部就班的顺序执行,通常为提高代码执行的效率,需要多个代码执行任务同时执行,也就是多任务编程的需求。
基本的计算机模型是由CPU、RAM及各种资源(键盘、硬盘、显卡、网卡等)组成,代码的执行过程,实际就是CPU和相关寄存器及RAM之间的相关处理的过程。在单核CPU场景下,一段代码交由CPU执行前,都会处于就绪队列中,CPU执行时很快就会返回该段代码的结果,所以不同进程的代码是轮流由CPU执行的,由于CPU执行速度很快,在表现上仍会被感觉是同时执行的。不同就绪队列间的读入与结果保存被称之为上下文切换,由于进程间切换会产生一定的时间等待及资源的消耗,所以为了减少等待时间和资源的消耗,就引入了线程的设计。线程是当进程的队列被授权占用CPU时,该进程的所有线程队列在共享该进程资源的环境下按优先级由CPU执行。无论是进程还是线程,其队列及资源切换都是由操作系统进行控制的,同时线程的切换也是非常消耗性能的,为了使各线程的调度更节约资源,就出现了协程的设计。协程是在进程或线程环境下执行的,其拥有自己的寄存器上下文和栈,调度是完全由用户控制的,相当于函数方法的调度。对于多任务编程,若要实现代码的多任务高效率执行,我们要明晰如下这几个概念的特点及其区别,才能根据实际需求,选用最佳的多任务编程方法。
- 并行
指在同一时刻有多个进程的指令在多个处理器上同时执行。 - 并发
是指在同一时刻只能有一个进程的指令执行,但多个进程指令被快速轮换执行,使得在宏观上具有多个进程同时执行的效果。 - 进程
进程是程序的运行态,进程间数据共享需要借助外部存储空间。 - 线程
线程是进程的组成部分,一个进程可以包含一个或多个线程,同一进程内线程间数据共享属于内部共享。 - 协程
协程是一种用户态的轻量级线程,一个进程可以包含一个或多个协程,也可以在一个线程包含一个或多个协程。协程的调度完全由用户控制,同一进程内协程间数据共享属于内部共享。
多线程处理
由于Python是动态编译的语言,与C/C++、Java等静态语言不同,它是在运行时一句一句代码地边编译边执行的。用C语言实现的Python解释器,通常称为CPython,也是Python环境默认的编译器。在Cpython解释器中,为防止多个线程同时执行同一 Python 的代码段,确保线程数据安全,引入了全局解释器锁(GIL, Global Interpreter Lock)的处理机制, 该机制相当于一个互斥锁,所以即便一个进程下开启了多线程,但同一时刻只能有一个线程被执行。所以Python 的多线程是伪线程,性能并不高,也无法利用CPU多核的优势。
另,GIL并不是Python的特性,他是在实现Python解释器(Cpython)时所引入的一个概念,GIL保护的是解释器级的数据,保护用户自己的数据仍需要自己加锁处理。在默认情况下,由于GIL的存在,为了使多线程(threading)执行效率更高,需要使用join方法对无序的线程进行阻塞,如下代码可以看到区别。
from multiprocessing import Process
import threading
import os,time
l=[]
stop=time.time()
def work():
global stop
time.sleep(2)
print(\'===>\',threading.current_thread().name)
stop=time.time()
def test1():
for i in range(400):
p=threading.Thread(target=work,name="test"+str(i))
l.append(p)
p.start()
def test2():
for i in range(400):
p=threading.Thread(target=work,name="test"+str(i))
l.append(p)
p.start()
for p in l:
p.join()
if __name__ == \'__main__\':
print("CPU Core:",os.cpu_count()) #本机为4核
print("Worker: 400") #测试线程数
start=time.time()
test1()
active_count=threading.active_count()
while (active_count>1):
active_count=threading.active_count()
continue
test1_result=stop-start
start=time.time()
l=[]
test2()
active_count=threading.active_count()
while (active_count>1):
active_count=threading.active_count()
continue
print(\'Thread run time is %s\' %(test1_result))
print(\'Thread join run time is %s\' %(stop-start))
执行结果如下:
Thread run time is 4.829492807388306
Thread join run time is 2.053645372390747
由上结果可以看到, 多线程时join阻塞后执行效率提高了很多。
多进程与多线程
多任务编程的本质是CPU占用方法的调度处理,对于python下多任务处理有多种编程方法可供选择,分别有多进程(multiprocessing)、多线程(threading)及异步协程(Asyncio),在实际使用中该如何选择呢?我们先看如下一段程序的执行效果。
from multiprocessing import Process
from threading import Thread
import os,time
l=[]
def work():
res=0
for i in range(100000000):
res*=i
def test1():
for i in range(4):
p=Process(target=work)
l.append(p)
p.start()
def test2():
for i in range(4):
p=Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
if __name__ == \'__main__\':
print("CPU Core:",os.cpu_count()) #本机为4核
print("Worker: 4") #工作线程或子进程数
start=time.time()
test1()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
print(\'Process run time is %s\' %(stop-start))
start=time.time()
l=[]
test2()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
print(\'Thread run time is %s\' %(stop-start))
执行结果如下:
CPU Core: 4
Worker: 4
Process run time is 11.030176877975464
Thread run time is 17.0117769241333
从上面的结果,我们可以看到同一个函数用Process及Thread 不同的方法,执行的时间是不同的,为什么会产生这样的差异?
多进程(multiprocessing)方法使用子进程而非线程,其有效地绕过了全局解释器锁GIL(Global Interpreter Lock), 并充分利用了多核CPU的性能,所以在多核CPU环境下,其比多线程方式效率要高。
协程
又称为微线程,协程也可被看作是被标注的函数,不同被表注函数的执行和切换就是协程的切换,其完全由编程者自行控制。协程一般是使用 gevent库,在早期这个库用起来比较麻烦,所以在python 3.7以后的版本,对协程的使用方法做了优化。执行代码如下:
import asyncio
import time
async def work(i):
await asyncio.sleep(2)
print(\'===>\',i)
async def main():
start=time.time()
l=[]
for i in range(400):
p=asyncio.create_task(work(i))
l.append(p)
for p in l:
await p
stop=time.time()
print(\'run time is %s\' %(stop-start))
asyncio.run(main())
执行结果如下:
run time is 2.0228068828582764
另,默认环境下,协程是在单线程模式下执行的异步操作,其并不能发挥多处理器的性能。为了提升执行效率,可以在多进程中执行协程调用方法,代码用例如下:
from multiprocessing import Process
import asyncio
import os,time
l=[]
async_result=0
async def work1():
res=0
for i in range(100000000):
res*=i
# 协程入口
async def async_test():
m=[]
for i in range(4):
p=asyncio.create_task(work1())
m.append(p)
for p in m:
await p
async def async_test1():
await asyncio.create_task(work1())
def async_run():
asyncio.run(async_test1())
# 多进程入口
def test1():
for i in range(4):
p=Process(target=async_run)
l.append(p)
p.start()
if __name__ == \'__main__\':
print("CPU Core:",os.cpu_count()) #本机为4核
print("Worker: 4") #工作线程或子进程数
start=time.time()
asyncio.run(async_test())
stop=time.time()
print(\'Asyncio run time is %s\' %(stop-start))
start=time.time()
test1()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
print(\'Process Asyncio run time is %s\' %(stop-start))
执行结果如下:
CPU Core: 4
Worker: 4
Asyncio run time is 18.89663052558899
Process Asyncio run time is 10.865562438964844
如上结果,在多进程中调用多协程的方法,执行效率明显提高。
多任务编程选择
如上所结果是否是就决定一定要选择多进程(multiprocessing)模式呢?我们再看下如下代码:
from multiprocessing import Process
from threading import Thread
import os,time
l=[]
# 仅计算
def work1():
res=0
for i in range(100000000):
res*=i
# 仅输出
def work2():
time.sleep(2)
print(\'===>\')
# 多进程,仅计算
def test1():
for i in range(4):
p=Process(target=work1)
l.append(p)
p.start()
# 多进程,仅输出
def test_1():
for i in range(400):
p=Process(target=work2)
l.append(p)
p.start()
# 多线程,仅计算
def test2():
for i in range(4):
p=Thread(target=work1)
l.append(p)
p.start()
for p in l:
p.join()
# 多线程,仅输出
def test_2():
for i in range(400):
p=Thread(target=work2)
l.append(p)
p.start()
for p in l:
p.join()
if __name__ == \'__main__\':
print("CPU Core:",os.cpu_count()) #本机为4核
start=time.time()
test1()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
test_result=stop-start
start=time.time()
l=[]
test_1()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
test1_result=stop-start
start=time.time()
l=[]
test2()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
test2_result=stop-start
start=time.time()
l=[]
test_2()
while (l[len(l)-1].is_alive()):
continue
stop=time.time()
test3_result=stop-start
print(\'Process run time is %s\' %(test_result))
print(\'Process I/O run time is %s\' %(test1_result))
print(\'Thread run time is %s\' %(test2_result))
print(\'Thread I/O run time is %s\' %(stop-start))
执行结果如下:
Process run time is 10.77662968635559
Process I/O run time is 2.9869778156280518
Thread run time is 16.842355012893677
Thread I/O run time is 2.024587869644165
由结果可看,在仅计算的操作时,多进程效率比较高,在仅输出的操作时,多线程的效率比较高,所以在实际使用中要根据实际情况测试决定。通用的建议如下:
- 多线程(threading)用于IO密集型,如socket,爬虫,web
- 多进程(multiprocessing)用于计算密集型,如数据分析
参考
- 多线程
https://docs.python.org/zh-cn/3/library/threading.html - 多进程
https://docs.python.org/zh-cn/3/library/multiprocessing.html# - 协程
https://docs.python.org/zh-cn/3/library/asyncio-task.html
以上是关于Python的多任务编程的主要内容,如果未能解决你的问题,请参考以下文章