GIL 线程池与进程池 同步与异步
Posted wyf20190411-
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了GIL 线程池与进程池 同步与异步相关的知识,希望对你有一定的参考价值。
1.GIL 全局解释器锁 只存在于cPython中,其他解释器中没有
释以:在cpython中它是一种互斥锁是为了防止多个线程在同一时间执行python字节码,这个锁是非常重要的,因为cpython的内存管理是非线程安全的,而且很多已经存在的代码需要依赖这个锁,所以即使它影响了程序效率也无法将其去除。
优点:保证了cpython中内存管理是线程安全的
缺点:使得多线程无法并行
注:非线程安全:多个线程访问统一个资源,会有问题;
线程安全:多个线程访问统一个资源,不会有问题,与之相反
相关:用cPython的原因:1.c编译过得结果可以直接被计算机识别;最重要的是c有大量现成的库(算法,通讯),cpython可以无缝连接c的任何现成代码
2,大量的应用程序都是IO密集,
2.GIL带来的问题:会使程序整体效率降低
由于互斥锁的特性,会导致程序串行,多线程不能并行,为保证数据安全,故而加锁,因此降低执行效率
3.解决方案 分区任务类型
如果是io密集型:使用多线程,因为io多的话,时间会大量花在io等待上,对此并行对耗时无提升
```python from multiprocessing import Process from threading import Thread import time def task(): with open("test.txt",encoding="utf-8") as f: f.read() if __name__ == ‘__main__‘: start_time = time.time() # 多进程 # p1 = Process(target=task) # p2 = Process(target=task) # p3 = Process(target=task) # p4 = Process(target=task) # 多线程 p1 = Thread(target=task) p2 = Thread(target=task) p3 = Thread(target=task) p4 = Thread(target=task) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() print(time.time()-start_time) ```
如果是计算密集型,要看有没有多核处理器了,有的话用多进程 需要注意并行的任务不能太多 开启进程非常消耗资源
from multiprocessing import Process from threading import Thread import time def task(): for i in range(10000000): i += 1 if __name__ == ‘__main__‘: start_time = time.time() # 多进程 # p1 = Process(target=task) # p2 = Process(target=task) # p3 = Process(target=task) # p4 = Process(target=task) # 多线程 p1 = Thread(target=task) p2 = Thread(target=task) p3 = Thread(target=task) p4 = Thread(target=task) p1.start() p2.start() p3.start() p4.start() p1.join() p2.join() p3.join() p4.join() print(time.time()-start_time)
4.加锁解锁时机
加锁:在调用解释器时立即加锁
解锁:该线程任务结束时, 遇到io时 ,他使用解释器程过长,超过设定值时
5.GIL锁与自定义锁的关系
首先他们都是互斥锁,为什么有了gil还需要自己加锁呢,是因为gil是加在解释器上的锁,只能锁住届时其内部的资源,故而我们自己开起的资源得自己加锁
例如: 我们自己开启了一个json文件,多个线程要同时访问, 如果第一个线程写入数据写到一半,执行超时了,另一个线程过来接着写,就会产生问题,
6.线程池与进程池 池:容器
定义:装线程与进程的一种容器
使用时机:如果是io密集型用线程池,如果是计算密集型用进程池
作用:1.控制线程与进程的数量保证了系统的稳定
注:信号量中实现制同时并发多少,但线程已经全都建完了
2.自动管理线程的开启与销毁
3.自动分配任务给空闲的人
使用:1.创建池子
2.submit 提交任务
3.pool.shutdown()等待所有任务全部完毕销毁所有线程后关闭线程池,关闭后就不能提交新任务了
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time,os # 创建进程池,指定最大进程数为3,此时不会创建进程,不指定数量时,默认为CPU和核数 pool = ProcessPoolExecutor(3) def task(): time.sleep(1) print(os.getpid(),"working..") if __name__ == ‘__main__‘: for i in range(10): pool.submit(task) # 提交任务时立即创建进程 # 任务执行完成后也不会立即销毁进程 time.sleep(2) for i in range(10): pool.submit(task) #再有新任务是 直接使用之前已经创建好的进程来执行
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from threading import current_thread,active_count import time,os # 创建进程池,指定最大线程数为3,此时不会创建线程,不指定数量时,默认为CPU和核数*5 pool = ThreadPoolExecutor(3) print(active_count()) # 只有一个主线 def task(): time.sleep(1) print(current_thread().name,"working..") if __name__ == ‘__main__‘: for i in range(10): pool.submit(task) # 第一次提交任务时立即创建线程 # 任务执行完成后也不会立即销毁 time.sleep(2) for i in range(10): pool.submit(task) #再有新任务时 直接使用之前已经创建好的线程来执行
7.同步与异步
在并发症中经常提及的概念
1.阻塞与非阻塞(就绪或运行) 程序的运行状态
阻塞:当程序执行过程中遇到了IO操作,在执行IO操作时,程序无法继续执行其他代码,称为阻塞!
非阻塞:程序在正常运行状态
2.并发与并行 多任务处理方式
并发:多个任务看起来像同时运行,本质是切换加保存状态
并行:真正的同时进行中,必须具备多核处理器
3.同步与异步 任务提交(执行)方式
同步:是指发起任务后,必须在原地等待,直到任务完成拿到结果 同步 i=阻塞 卡住i=阻塞
注:默认情况为同步的 同步会有等待的效果但是这和阻塞是完全不同的,阻塞时程序会被剥夺CPU执行权,而同步调用则不会
异步:发起任务无需等待结果,可以继续等待其他代码 异步i=非阻塞
注 :异步任务必须依赖并发与并行,在py中通过多线程或多进程
异步效率明显高于同步
#=======================程序中的异步调用并获取结果方式一
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time pool = ThreadPoolExecutor(3) def task(i): time.sleep(0.01) print(current_thread().name,"working..") return i ** i if __name__ == ‘__main__‘: objs = [] for i in range(3): res_obj = pool.submit(task,i) # 异步方式提交任务# 会返回一个对象用于表示任务结果 objs.append(res_obj) # 该函数默认是阻塞的 会等待池子中所有任务执行结束后执行 pool.shutdown(wait=True) # 从结果对象中取出执行结果 for res_obj in objs: print(res_obj.result()) print("over")
#=======================程序中的异步调用并获取结果方式二
from concurrent.futures import ThreadPoolExecutor from threading import current_thread import time pool = ThreadPoolExecutor(3) def task(i): time.sleep(0.01) print(current_thread().name,"working..") return i ** i if __name__ == ‘__main__‘: objs = [] for i in range(3): res_obj = pool.submit(task,i) # 会返回一个对象用于表示任务结果 print(res_obj.result()) #result是同步的一旦调用就必须等待 任务执行完成拿到结果 print("over")
8.异步回调
异步任务的问题:任务完成后产生的返回值,任务发起方何时去取
解决方案:异步回调
定义:是指在发起一个异步任务的同时指定一个函数,在异步任务完成时会自动的调用这个函数
eg:如果把任务比喻为烧水,没有回调时就只能守着水壶等待水开,有了回调相当于换了一个会响的水壶,烧水期间可用作其他的事情,等待水开了水壶会自动发出声音,这时候再回来处理。水壶自动发出声音就是回调
用处:即保证解析结果的线程不用等待,又能保证数据能够及时被解析,该方案就是异步回调
使用:就是在提交任务后得到一个futures对象,调用对象的add_done_callback来指定一个回调函数
import requests,re,os,random,time from concurrent.futures import ProcessPoolExecutor def get_data(url): print("%s 正在请求%s" % (os.getpid(),url)) time.sleep(random.randint(1,2)) response = requests.get(url) print(os.getpid(),"请求成功 数据长度",len(response.content)) #parser(response) # 3.直接调用解析方法 哪个进程请求完成就那个进程解析数据 强行使两个操作耦合到一起了 return response def parser(obj): data = obj.result() htm = data.content.decode("utf-8") ls = re.findall("href=.*?com",htm) print(os.getpid(),"解析成功",len(ls),"个链接") if __name__ == ‘__main__‘: pool = ProcessPoolExecutor(3) urls = ["https://www.baidu.com", "https://www.sina.com", "https://www.python.org", "https://www.tmall.com", "https://www.mysql.com", "https://www.apple.com.cn"] # objs = [] for url in urls: # res = pool.submit(get_data,url).result() # 1.同步的方式获取结果 将导致所有请求任务不能并发 # parser(res) obj = pool.submit(get_data,url) # obj.add_done_callback(parser) # 4.使用异步回调,保证了数据可以被及时处理,并且请求和解析解开了耦合 # objs.append(obj) # pool.shutdown() # 2.等待所有任务执行结束在统一的解析 # for obj in objs: # res = obj.result() # parser(res) # 1.请求任务可以并发 但是结果不能被及时解析 必须等所有请求完成才能解析 # 2.解析任务变成了串行,
1.在进程池中回调函数是在父进程中执行,因为任务由父进程发起,所以结果也应交给父进程
2.在线程池中回调函数在子进程中执行,因为线程之间数据共享
3.如果你的任务结果需要交给父进程来处理,那建议回调函数,回调函数会自动将数据返回给父进程,不需要自己处理IPC
以上是关于GIL 线程池与进程池 同步与异步的主要内容,如果未能解决你的问题,请参考以下文章
27 Apr 18 GIL 多进程多线程使用场景 线程互斥锁与GIL对比 基于多线程实现并发的套接字通信 进程池与线程池 同步异步阻塞非阻塞