python 并发编程
Posted trent-fzq
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python 并发编程相关的知识,希望对你有一定的参考价值。
1. 并发的目的
1. 多任务的处理需求
2. 充分利用多核 cpu 资源
2. 并发 并行 串行
1. 并发:指在一个时间段内,有几个程序在一个 cpu 上运行,但是任意时刻只有一个程序在 cpu 上运行。
2. 并行:指任意时刻上,有多个程序同时运行在多个 cpu 上。
3. 串行:也叫顺序执行,逐个任务执行。
3. 进程并发
3.1 进程的定义
进程(Process,有时被称为重量级进程)是系统进行资源分配和调度的一个独立单位。每个进程都有自己的地址空间、内存、数据栈以及记录运行轨迹的辅助数据,操作系统管理运行的所有进程,并为这些进程公平分配时间。
3.2 进程的特点
独立性:各个进程有自己的内存空间、数据栈等,所以进程间不能直接共享信息,只能使用进程间通信(IPC)。
异步性:推进相互独立、速度不可预知。
一个进程要执行一个任务,就必须开启一个线程,但线程不能独立执行。
3.3 进程的基本状态
进程主要有三个状态:就绪、运行、阻塞
3.4 主进程和子进程
3.4.1 多进程的运行先后顺序
在主进程中创建了子进程,如果不做任何约束,那么子进程的运行状态将不受主进程控制。因此,主进程的任务一般都最先完成。
3.4.2 join 阻塞
如果我们想要子进程都执行结束了,最后主进程才执行,那么我们就要在开启了子进程之后使用 join 阻塞。
3.4.3. daemon 守护
如果希望子进程跟随着主进程的结束而结束,那么我们需要在子进程开启之前,将子进程设置为守护进程。
所谓的守护进程,就是让其他的进程都执行结束,而本身可以在其他的进程结束后立马结束。
3.5 进程同步,即锁的问题
由于多个进程/线程可能会同时访问和操作同一片数据,产生竞态条件,因此导致意想不到的结果。
对于此种情况,我们采用锁的机制,即是同步机制,同一片数据只能同时让一个进程/线程进行访问和操作。
注意:锁是比较消耗性能的
3.5.1. 死锁
在多道程序系统中,并发执行的多个进程因争夺资源而造成的一种若无外力作用有关进程都将永远不能向前推进的一种阻塞的现象。
死锁的产生
原因之一:竞争资源
原因之二:进程推进次序不当
例子:比如多人同时吃一碗面的问题,必须同时拿到筷子和碗之后才能吃面,现有道具碗和筷子两样。
如果先拿筷子的和先拿到碗的不是同一个人,而且拿着不放,那么最后谁都吃不了。
竞争资源产生的死锁现象
进程推进次序不当产生的死锁
产生死锁的必要条件
1. 互斥条件:资源排它性使用
2. 请求和保持条件:请求资源未果进程虽阻塞但保持占有资源不放
3. 不剥夺条件:进程已获资源未使用完之前不能被剥夺
4. 环路等待条件:进程-资源环形链 P0, P1, P2, …, Pn
死锁的解决方法
1. 加锁顺序
2. 加锁时限
3. 死锁检查
3.5.2. 信号量 Semaphore
最多能够允许一定数量的进程/线程进行访问。
3.5.3. 事件 Event
当事件发生之后,才允许进程/线程进行访问。
3.5.4. 经典案例:红绿灯
from multiprocessing import Process, Event
import time
import random
'''
标志位设定, 代表绿灯, 直接通行. 标志位被清空, 代表红灯. wait() 等待变绿灯
# 阻塞事件:
event = Event() # 生成事件对象
# event.set() # 设置标志位, 此时 event.is_set() == False, 即发生阻塞
# event.clear() # 清空标志位, 此时 event.is_set() == True, 即不发生阻塞
# event.wait() # 等待设置标志位
# event.is_set() # 获取标志状态, 返回True或False
'''
def traffic_lights(e):
print('红灯') # 默认阻塞
while True:
# 判断标志状态, 如果已经设置则可通过
if e.is_set():
time.sleep(2) # 绿灯时间
print('红灯')
e.clear() # 清空标志位
else:
# 绿灯等待时间
time.sleep(1)
print('绿灯')
e.set() # 设置标志位
def cars(e, i):
if not e.is_set():
print('car[%s] 在等待' % i)
e.wait() # 等待设置标志位
print('car[%s] 通行' % i)
if __name__ == '__main__':
e = Event()
# 交通灯
p1 = Process(target=traffic_lights, args=(e,))
p1.daemon = True # 设置为守护进程
p1.start()
# 车辆交通
for i in range(10):
time.sleep(random.uniform(0.1, 1))
p2 = Process(target=cars, args=(e, i))
p2.start()
3.6 进程的通信
3.6.1. 队列 Queue
Queue 先进先出,当队列为空时,获取队列中的值会发生阻塞。需要注意的是,Queue 不能用于进程池 Pool 间的通信。
JoinableQueue 与 Queue 相似,但 JoinableQueue 使用 get 方法获取之后需调用 task_done 方法来减少元素计数。
3.6.2. Manager
Manager 通信的机制是内存的共享。Manager 主要用于进程池间的通信,里面封装了很多数据类型,比如 dict, list, Queue 等。
3.6.3. 管道 Pipe
Pipe 内部使用 Socket 来实现通信。只适用于两个进程间的通讯,其中一个接收数据,另一个发送数据。但是 Pipe 的性能相对于 Queue 等的性能相对较高。
3.6.4. 经典案例:生产者与消费者模型
import random
import time
from multiprocessing import JoinableQueue, Process
# 消费者
def consumer(jq):
while True:
some = jq.get()
time.sleep(random.uniform(0.1, 1))
print("consumer 消费了一个 %s" % (some))
jq.task_done()
# 生产者
def producer(jq, some):
for i in range(5):
time.sleep(random.uniform(0.1, 1))
print("producer 生产了 %s %s" % (some, i))
jq.put(some+str(i))
if __name__ == "__main__":
jq = JoinableQueue()
# 创建消费者
c1 = Process(target=consumer, args=(jq, ))
c1.daemon = True
c1.start()
# 创建生产者
p1 = Process(target=producer, args=(jq, "香奈儿"))
p1.start()
p1.join()
jq.join() # 阻塞
print("全部结束")
4. 线程并发
4.1 线程的引入
由于进程创建、切换和撤销等操作时的开销较大,进程并发执行程度及进程间通信效率受限,以及系统并发程度进一步提高的客观需求(如 C10K 问题),我们引入了线程。
线程(Thread,有时被称为轻量级进程)跟进程有些相似,不同的是所有线程运行在同一个进程中,共享运行环境。
线程有开始、顺序执行和结束3部分,有一个自己的指令指针,记录运行到什么地方。
4.2 线程的特点
1. 轻型实体及共享进程资源
2. 独立调度和分派的基本单位
3. 地址空间共享及通信效率
4. 系统并发执行程度大大提高
4.3 线程与进程
多线程是多个相互关联的线程的组合,多进程是多个互相独立的进程的组合。
线程是最小的执行单元,进程至少由一个线程组成。
4.4 GIL 锁
GIL (the Global Interpreter Lock) 锁是CPython解释器里特有的全局,它能保证同一时刻只有一个线程运行。
多线程是一个好东西。不过,由于GIL锁的限制,多线程更适合于I/O密集型应用(I/O 释放了GIL,可以允许更多的并发),而不是计算密集型应用。
对于后一种情况而言,为了实现更好的并行性,你需要使用多进程,以便让CPU的其他内核来执行。
4.5 既然Python解释器是单线程的,还有进行多线程编程的必要吗?
答:多线程最开始不是用来解决多核利用率问题的,而是用来解决 IO 占用时 CPU 闲置的问题。
多线程可以用来解决阻塞问题,可以做事件响应机制(或者类似信号槽的问题)。如果运行瓶颈不是在 CPU 运算而是在IO(如网络)上,多线程显然很划算。
能产生 IO 阻塞的情况很多,如网络、磁盘等。当发生阻塞时,Python 是不耗 CPU 的,此时如果只有一个线程就没法处理其他事情了。对于有 IO 阻塞的环境,多线程可能让你的 CPU 跑到 100%。
另一个用处来自于 Python 的 C 扩展模块。在扩展模块里可以释放 GIL。释放 GIL 期间不应该调用任何 Python API。对于一些非常繁重的计算,可以写成 C 模块,计算前释放 GIL,计算后重新申请 GIL,并将结果返回给 Python。这样就可以让 Python 进程利用更多 CPU 资源。每个 Python 线程都是 OS 级别 Pthread 的线程。利用 Python 管理这些线程比在 C 层级操作 Pthread 更方便。
4.6 线程的同步与通信
4.6.1 同步
在线程中的同步方式有,锁(Lock),递归锁(RLock),信号量(Semaphore),条件变量(Condition)和事件(Event)。
递归锁 RLock:
为了支持在同一线程中多次请求同一资源,python提供了递归锁 threading.RLock。其内部维护着一个 Lock 和一个 counter 变量,counter 记录了 acquire 的次数,从而使得资源可以被多次 acquire。直到一个线程所有的 acquire 都被 release,其他的线程才能获得资源。
递归锁专门用来解决死锁现象,临时用于快速解决服务器崩溃异常现象,用递归锁应急。
条件变量 Condition :
Condition(条件变量)通常与一个锁关联。需要在多个 Contidion 中共享一个锁时,可以传递一个 Lock/RLock 实例给构造方法,否则它将自己生成一个 RLock 实例。
可以认为,除了 Lock 带有的锁定池外,Condition 还包含一个等待池,池中的线程处于状态图中的阻塞状态,直到另一个线程调用 notify/notifyAll 方法通知。得到通知后,线程进入锁定池等待再次被锁定。
4.6.2 通信
在线程中的通信方式有,共享变量(如list等),局部变量(local),队列(Queue)
局部变量 local:
在 threading 模块下提供了一个 local() 函数,该函数可以返回一个线程局部变量,通过使用线程局部变量可以很简捷地隔离多线程访问的竞争资源,从而简化多线程井发访问的编程处理。
本质上就是不同的线程使用这个对象时,为其创建一个只属于当前线程的字典,拿空间换时间的方法。
5. 协程
5.1 协程的引入
使用同步的方式,实现异步的功能。减少通信、同步和操作系统切换任务的开销等诸多性能的消耗,以及解决 C10M 等问题。
在linux系统中,线程就是轻量级的进程,而我们通常也把协程称为轻量级的线程,即微线程(coroutine)。
5.2 协程的特点
1. 像生成器那样,可以在子程序的内部暂停,去执行其他的子程序,在返回执行原子程序的时候又可以在暂停处继续执行。
2. 所有的切换操作都是用户态的,不会涉及操作系统的内核,因此减少了从内核拷贝到用户态的时间,从而提高效率。
3. 协程的本质是个单线程,需要和进程配合才能利用多核资源。
4. 进行阻塞(Blocking)操作时,整个程序会被挂起
5.3 原生协程
协程原本是使用生产器来实现的,但为了明确语义以及提高使用协程的开发效率,python 引入了 async 和 await 两个关键字供我们使用。以下是原生协程的示例
# import types
# @types.coroutine # 可以使用此装饰器来替代 async
async def downloader(url):
return "trent"
async def parse_url(url):
# await 相当于生成器的 yield from
html = await downloader(url)
print(html)
return html
if __name__ == "__main__":
coroutine = parse_url("http://www.baidu.com")
# next(None) # 不能用 next 方法来启动协程
coroutine.send(None) # 启动协程
coroutine.close() # 关闭协程
5.4 一个协程包 gevent
import time
import gevent
from gevent import monkey
# patch_all识别引入的所有模块中的阻塞,包括同步阻塞和异步阻塞
monkey.patch_all()
def async_func1():
print("async_func1 1")
time.sleep(0.5)
print("async_func1 2")
def async_func2():
print("async_func2 1")
time.sleep(0.5)
print("async_func2 2")
g1 = gevent.spawn(async_func1) # 创建协程对象
g2 = gevent.spawn(async_func2) # 创建协程对象
gevent.joinall([g1, g2]) # 阻塞
print("done")
执行结果:
async_func1 1
async_func2 1
async_func1 2
async_func2 2
done
6. 异步 IO 编程
6.1 Linux 中的五种 IO 模型
内核态和用户态
由于需要限制不同的程序之间的访问能力,防止他们获取别的程序的内存数据,或者获取外围设备的数据,操作系统划分出两个权限等级:用户态和内核态。
内核态:当一个任务(进程)执行系统调用而陷入内核代码中执行时,称进程处于内核运行态(内核态)。
用户态:当进程在执行用户自己的代码时,则称其处于用户运行态(用户态)。
五种 IO 模型中, IO 复用的技术较为成熟,因此使用也比较广泛。而最为理想的是异步 IO,整个过程没有阻塞,而且有通知。
6.2 IO 多路复用
IO 多路复用模型有三种:select poll epoll,它们层层递增。
6.2.1 select
sokect 是通过一个 select() 系统调用来监视多个文件描述符,当 select() 返回后,该数组中就绪的文件描述符便会被该内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。
select 的优点是支持跨平台,缺点在于单个进程能够监视的文件描述符的数量存在最大限制。
另外 select() 所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量 TCP 连接处于非活跃状态,但调用 select() 会对所有 socket 进行一次线性扫描,所以这也浪费了一定的开销。
文件描述符 fd
文件描述符是一个用于表述指向文件的引用的抽象化概念。
文件描述符在形式上是一个非负整数,实际上,它是一个索引值,指内核为每一个进程所维护的进程打开文件的记录的记录表,当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。
6.2.2 poll
poll 和 select 在本质上没有多大差别,但是 poll 没有最大文件描述符数量的限制。
poll 和 select 存在同样一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。
另外,select() 和 poll() 将就绪的文件描述符告诉进程后,如果进程没有对其进行 IO 操作,那么下次调用 select() 和 poll() 的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。
6.2.3 epoll
epoll 可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上,边缘触发的性能要更高一些,但是代码实现相当复杂。
epoll 同样只告知那些就绪的文件描述符,而且当我们调用 epoll_wait() 获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去 epoll 指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于 epoll 采用基于事件的就绪通知方式。在 select/poll 中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而 epoll 事先通过 epoll_ctl() 来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似 callback 的回调机制,迅速激活这个文件描述符,当进程调用 epoll_wait() 时便得到通知。
6.2.4 select 实例
import socket
from selectors import EVENT_READ, EVENT_WRITE, DefaultSelector
from urllib.parse import urlparse
# DefaultSelector 会根据平台自动选择 select/poll/epoll
selector = DefaultSelector()
urls = ["https://www.baidu.com"] # 请求的 url 列表
stop = False # 停止 loop() 的标志
class Fetcher():
# 连接回调函数
def connected(self, key):
selector.unregister(key.fd)
smsg = "GET HTTP/1.1\r\nHost:\r\nConnection:close\r\n\r\n".format(self.path, self.host).encode('utf-8')
self.client.send(smsg)
selector.register(self.client.fileno(), EVENT_READ, self.readable)
# 可读回调函数
def readable(self, key):
d = self.client.recv(1024)
if d:
self.data += d
else:
selector.unregister(key.fd)
data = self.data.decode('utf-8')
html_data = data.split("\r\n\r\n")[1]
print(html_data)
self.client.close()
urls.remove(self.spider_url)
if not urls:
global stop
stop = True # 请求完成标志
def get_url(self, url):
self.data = b""
self.spider_url = url
# 请求 html
url = urlparse(url)
self.host = url.netloc
self.path = url.path
if self.path == "":
self.path = "/"
# 建立 socket
self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client.setblocking(False) # 设置为非阻塞
try:
self.client.connect((self.host, 80))
except BlockingIOError:
pass
# 将 socket 注册到 selector,send 相当于写 EVENT_WRITE
selector.register(self.client.fileno(), EVENT_WRITE, self.connected)
def loop():
# 事件循环, 不停的请求 socket 的状态并调用对应的回调函数
# 1. select 本身是不支持 register 模式的
# 2. socket 状态变化以后的回调应该由程序员完成
while not stop:
ready = selector.select()
for key, mask in ready:
call_back = key.data
call_back(key)
# 回调 + 事件循环 + select
if __name__ == "__main__":
fetcher = Fetcher()
fetcher.get_url(urls[0])
loop()
6.3 asyncio 模块
6.3.1 wait 方法
import asyncio
import time
async def foo(arg):
print("start...", arg)
# 使用异步阻塞,不能使用同步阻塞,否则会变成同步的程序
await asyncio.sleep(2)
print("end...", arg)
return "arg"+str(arg)
if __name__ == "__main__":
start_time = time.time()
loop = asyncio.get_event_loop() # 获取一个异步事件循环
tasks = [foo(i) for i in range(1, 4)] # 定义三个任务
# 异步执行多任务,wait 返回的是一个协程
# run_until_complete 会在循环结束之后停止 loop,区别于 run_forever 方法
loop.run_until_complete(asyncio.wait(tasks))
print(time.time() - start_time) # 输出程序耗时
执行结果
start... 2
start... 3
start... 1
end... 2
end... 3
end... 1
2.0156099796295166
从任务的执行顺序和时间不难看出,这是一个并发程序
6.3.2 gather 方法
import asyncio
async def foo(n):
print(n)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
group1 = [foo(i) for i in range(2)]
group2 = [foo(i) for i in range(2, 4)]
group1 = asyncio.gather(*group1)
group2 = asyncio.gather(*group2)
# group2.cancel() # 取消任务
# gather 返回的是一个 future, 比 wait 方法更高级
loop.run_until_complete(asyncio.gather(group1, group2))
以上是关于python 并发编程的主要内容,如果未能解决你的问题,请参考以下文章