自己手写调度器,理解Python中的asyncio异步事件循环与协程
Posted 小杰666
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自己手写调度器,理解Python中的asyncio异步事件循环与协程相关的知识,希望对你有一定的参考价值。
本文将从一段最简单的“顺序执行”代码开始,逐步深入,来理解Python的asyncio事件循环和协程的底层实现原理。
先说下Python yield的作用,简单说就是在代码中可以 暂停/恢复 代码的执行,这是最关键的,这样就有机会中断函数的执行,把时间分配给其他函数,然后在适当时机从中断位置恢复。在Python中有yield的函数,叫生成器,协程就是基于生成器一步步发展而来。
一、顺序执行
在开始前,先从最简单的顺序执行开始,代码如下:
import time
def countdown(n):
while n > 0:
print('Down', n)
time.sleep(1)
n -= 1
def countup(stop):
x = 0
while x < stop:
print('Up', x)
time.sleep(1)
x += 1
countdown(5)
countup(5)
顺序执行会依次执行两个函数,执行过程是同步的,输出内容如下:
Down 5
Down 4
Down 3
Down 2
Down 1
Up 0
Up 1
Up 2
Up 3
Up 4
二、并发执行
接下来,看下并发执行,并发执行的经典解决方案是 多线程,代码如下:
import time
import threading
def countdown(n):
while n > 0:
print('Down', n)
time.sleep(1)
n -= 1
def countup(stop):
x = 0
while x < stop:
print('Up', x)
time.sleep(1)
x += 1
threading.Thread(target=countdown, args=(5,)).start()
threading.Thread(target=countup, args=(5,)).start()
当用两个线程执行上面的两个函数,输出内容就不能保证顺序了,并且Up和Down还可能连在一起,这是因为print函数是线程不安全的,输出如下:
Down 5
Up 0
UpDown 1
4
UpDown 2
3
Up 3
Down 2
UpDown 41
那么,有没有办法不用多线程,就能实现并发?答案是有的。
三、引入调度器
要在单个线程中实现并发,就需要让countdown和countup两个函数轮替执行,于是就引入调度器,这个调度器的作用类似Python里的事件循环:
import time
from collections import deque
class Scheduler:
def __init__(self):
self.ready = deque() # 创建一个双向队列,存放待执行的函数
def call_soon(self, func):
self.ready.append(func) # 从右侧添加函数
def run(self):
while self.ready:
func = self.ready.popleft() # 从左侧弹出
func() # 执行函数
sched = Scheduler() # 调度器对象
def countdown(n):
if n > 0:
print('Down', n)
time.sleep(1) # 改成5,将会阻塞5秒,什么也做不了
sched.call_soon(lambda: countdown(n - 1)) # 把 n -= 1 修改成这行,放入待执行队列
def countup(stop):
def _run(x): # 使用嵌套函数能达到一样的效果
if x < stop:
print('Up', x)
time.sleep(1)
sched.call_soon(lambda: _run(x + 1))
_run(0)
sched.call_soon(lambda: countdown(5)) # 两个函数依次放入队列
sched.call_soon(lambda: countup(5))
sched.run() # 依次执行 countdown,countup
上面的代码,用一个双向队列,已经实现了一个简单的调度器,countdown和countup会依次执行。但是,这样的调度器只是简单的轮替执行,并没有同时执行的效果,输出如下:
Down 5
Up 0
Down 4
Up 1
Down 3
Up 2
Down 2
Up 3
Down 1
Up 4
上面的调度器还有个问题,比如把countdown里的sleep时间改成5秒,那么这个调度器在轮替到countdown时,会等待5秒,一直卡在那里,所以是一种资源浪费,因为卡5秒的时间里,什么事也干不了。
于是,我们开始优化,让sleep函数延后执行,而不是立刻阻塞,这次引入 call_later 方法:
import time
from collections import deque
class Scheduler:
def __init__(self):
self.ready = deque() # 创建一个双向队列,存放待执行的函数
self.sleeping = [] # 列表当成队列使用,存放需要sleep的函数
def call_soon(self, func):
self.ready.append(func) # 从右侧添加函数
def call_later(self, delay, func):
deadline = time.time() + delay # 过期时间
self.sleeping.append((deadline, func))
self.sleeping.sort() # 过期时间升序排序,最快过期的在最左侧,其实就是一个优先队列
def run(self): # 调度器主函数
while self.ready or self.sleeping:
if not self.ready:
# 如果待执行队列为空,就去sleep队列拿出最先到期并待执行的函数
# 拿出最先到期并待执行函数的目的是尽可能减少等待时间,提高并发的效率
deadline, func = self.sleeping.pop(0)
deta = deadline - time.time()
if deta > 0:
time.sleep(deta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft() # 从左侧弹出
func() # 执行函数
sched = Scheduler() # 调度器对象
def countdown(n):
if n > 0:
print('Down', n)
# time.sleep(4)
# 因为没有暂停/恢复能力,用递归调用来间接地实现“恢复执行”的能力,
# 下文将会用yield实现 暂停/恢复 能力
sched.call_later(4, lambda: countdown(n - 1))
def countup(stop):
def _run(x): # 使用嵌套函数能达到一样的效果
if x < stop:
print('Up', x)
# time.sleep(1)
sched.call_later(1, lambda: _run(x + 1))
_run(0)
sched.call_soon(lambda: countdown(5)) # 两个函数依次放入队列
sched.call_soon(lambda: countup(20))
sched.run() # 依次执行 countdown,countup
此时的输出,看着已经是并发执行了,一个简单的异步框架的雏形就出来了!
看到了吗,并发执行的关键是 Scheduler 这个类,它使用ready和sleeping两个队列共同实现轮换执行,真正的调度者是 Scheduler.run 方法,输出如下:
Down 5
Up 0
Up 1
Up 2
Up 3
Down 4
Up 4
Up 5
Up 6
Up 7
Down 3
Up 8
Up 9
Up 10
Up 11
Down 2
Up 12
Up 13
Up 14
Up 15
Down 1
Up 16
Up 17
Up 18
Up 19
下面,用优先队列代替列表排序,并用序列号解决deadline值相同时会对函数排序造成的异常(输出结果同上):
import time
import heapq
from collections import deque
class Scheduler:
def __init__(self):
self.ready = deque() # 创建一个双向队列,存放待执行的函数
self.sleeping = [] # 列表当成队列使用,存放需要sleep的函数
self.sequence = 0
def call_soon(self, func):
self.ready.append(func) # 从右侧添加函数
def call_later(self, delay, func):
self.sequence += 1 # 防止deadline相同时,对函数排序而报错
deadline = time.time() + delay # 过期时间
# 使用优先队列 代替每次对列表排序
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
# 如果待执行队列为空,就去sleep队列拿出最先到期并待执行的函数
# 拿出最先到期并待执行函数的目的是尽可能减少等待时间,提高并发的效率
deadline, _, func = heapq.heappop(self.sleeping)
deta = deadline - time.time()
if deta > 0:
time.sleep(deta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft() # 从左侧弹出
func() # 执行函数
sched = Scheduler() # 调度器对象
def countdown(n):
if n > 0:
print('Down', n)
# time.sleep(4)
sched.call_later(4, lambda: countdown(n - 1))
def countup(stop):
def _run(x): # 使用嵌套函数能达到一样的效果
if x < stop:
print('Up', x)
# time.sleep(1)
sched.call_later(1, lambda: _run(x + 1))
_run(0)
sched.call_soon(lambda: countdown(5)) # 两个函数依次放入队列
sched.call_soon(lambda: countup(20))
sched.run() # 依次执行 countdown,countup
四、异步队列的实现
现在,我们在调度器的基础上,自己实现一个异步队列,整合到上文的源码中,文件名aproducer.py。
Scheduler使用两个队列来实现函数的并发执行,ready队列存放待执行的函数,sleeping队列让sleep函数延后执行,然后通过Scheduler.run来调度函数的执行。
AsyncQueue也使用两个队列来实现异步生产-消费模型,items队列存放生产消费的数据,waiting队列实现异步非阻塞地消费数据(通过回调函数)。同时,异步队列还实现了一个close方法,来告知生产者、消费者队列是否关闭。
# aproducer.py
import time
import heapq
from collections import deque
class Scheduler:
def __init__(self):
self.ready = deque() # 创建一个双向队列,存放待执行的函数
self.sleeping = [] # 列表当成队列使用,存放需要延迟的函数
self.sequence = 0
# 立即执行的函数入待执行队列
def call_soon(self, func):
self.ready.append(func) # 从右侧添加函数
# 等待执行的函数入等待队列
def call_later(self, delay, func):
self.sequence += 1 # 防止deadline相同时,对函数排序而报错
deadline = time.time() + delay # 过期时间
# 使用优先队列 代替每次对列表排序
heapq.heappush(self.sleeping, (deadline, self.sequence, func))
def run(self):
while self.ready or self.sleeping:
if not self.ready:
# 如果待执行队列为空,就去sleep队列拿出最先到期并待执行的函数
# 这样做的目的是尽可能减少等待时间,提高并发的效率
deadline, _, func = heapq.heappop(self.sleeping)
deta = deadline - time.time()
if deta > 0:
time.sleep(deta)
self.ready.append(func)
while self.ready:
func = self.ready.popleft() # 从左侧弹出
func() # 执行函数
sched = Scheduler() # 调度器对象
class Result:
def __init__(self, value=None, exc=None):
self.value = value
self.exc = exc
def result(self):
if self.exc:
raise self.exc
else:
return self.value
class QueueClosed(Exception):
pass
class AsyncQueue:
def __init__(self):
self.items = deque()
self.waiting = deque()
self._closed = False
def close(self):
self._closed = True
if self.waiting and not self.items:
for func in self.waiting:
sched.call_soon(func)
def put(self, item):
if self._closed:
raise QueueClosed()
self.items.append(item)
if self.waiting:
func = self.waiting.popleft()
# func可能会递归调用,因此不要立即执行,同样由调度器来执行
sched.call_soon(func)
def get(self, callback):
if self.items:
callback(Result(value=self.items.popleft()))
else: # 如果队列为空,把get放入等待队列,等队列非空时执行
if self._closed:
callback(Result(exc=QueueClosed()))
else:
self.waiting.append(lambda: self.get(callback))
def producer(q, count):
def _run(n):
if n < count:
print('Producing', n)
q.put(n)
sched.call_later(1, lambda: _run(n + 1))
else:
print('Producer done')
q.close()
_run(0)
def consumer(q):
def _consum(result):
try:
item = result.result()
print('Consuming', item)
sched.call_soon(lambda: consumer(q))
except QueueClosed:
print('Consumer done')
q.get(callback=_consum)
q = AsyncQueue()
sched.call_soon(lambda: producer(q, 10))
sched.call_soon(lambda: consumer(q))
sched.run()
输出内容如下:
Producing 0
Consuming 0
Producing 1
Consuming 1
Producing 2
Consuming 2
Producing 3
Consuming 3
Producing 4
Consuming 4
Producing 5
Consuming 5
Producing 6
Consuming 6
Producing 7
Consuming 7
Producing 8
Consuming 8
Producing 9
Consuming 9
Producer done
Consumer done
五、引入yield
上文实现的调度器,本质上也是顺序执行的,只是效果上出现了不同函数之间切换执行。
这次,我们引入yield,这样我们就获得了 暂停/恢复 代码执行的能力。由此,我们可以实现一个基于协程的调度器,文件名yieldo.py。
# yieldo.py
import time
import heapq
from collections import deque
# 实现一个可等待类,利用yield的可暂停/恢复特性,给switch函数切换任务使用
class Awaitable:
def __await__(self):
yield # yield的作用类似中断,让代码在此处暂停,暂时交出执行权,稍后再回来恢复
def switch():
# 返回可等待对象,使用 await switch() 会调用 __await__,在 yield 处暂停
return Awaitable()
class Scheduler:
def __init__(self):
self.ready = deque() # 待执行队列
self.sleeping = [] # 等待队列
self.current = None # 当前正在执行的协程函数
self.sequence = 0 # 优先队列的第二个排序列
async def sleep(self, delay):
deadline = time.time() + delay
self.sequence += 1
heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
self.current = None # 置为空,则current不再入ready队列
# await配合Awaitable实例对象,
# 将会停在Awaitable里的yield,暂时退出函数,直到用send(None)恢复
# switch函数是为了把细节封装起来,也可以直接 await Awaitable()
await switch()
def new_task(self, coro):
self.ready.append(coro)
def run(self):
while self.ready or self.sleeping:
if not self.ready:
# 从优先队列(最小堆)弹出等待时间最少的待执行函数
deadline, _, coro = heapq.heappop(self.sleeping)
delta = deadline - time.time()
if delta > 0:
time.sleep(delta) # 真正的阻塞等待
self.ready.append(coro)
self.current = self.ready.popleft()
try:
# 调用协程的send方法,跟触发一个生成器函数的效果一样,
# 第一次send则开始执行协程函数,并停在调用链路最底下的yield处,
# 后面执行send则从yield处恢复,继续执行,该返回(return)则返回
self.current.send(None)
if self.current:
self.ready.append(self.current)
except StopIteration:
# Awaitable实例对象暂停一次后,第二次从暂停处恢复,并引发停止迭代异常,
# 使__await__执行结束,switch函数得以返回,达到暂停/恢复的目的
pass
sched = Scheduler()
async def countdown(n):
while n > 0:
print('Down', n)
await sched.sleep(4) # 把 time.sleep(4) 换成这行,达到类似 asyncio.sleep 的效果
n -= 自己手写调度器,理解Python中的asyncio异步事件循环与协程
深入理解 python3.4 中 Asyncio 库与 Node.js 的异步 IO 机制