自己手写调度器,理解Python中的asyncio异步事件循环与协程

Posted 小杰666

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自己手写调度器,理解Python中的asyncio异步事件循环与协程相关的知识,希望对你有一定的参考价值。

本文将从一段最简单的“顺序执行”代码开始,逐步深入,来理解Python的asyncio事件循环和协程的底层实现原理。

先说下Python yield的作用,简单说就是在代码中可以 暂停/恢复 代码的执行,这是最关键的,这样就有机会中断函数的执行,把时间分配给其他函数,然后在适当时机从中断位置恢复。在Python中有yield的函数,叫生成器,协程就是基于生成器一步步发展而来。

一、顺序执行

在开始前,先从最简单的顺序执行开始,代码如下:

import time


def countdown(n):
    while n > 0:
        print('Down', n)
        time.sleep(1)
        n -= 1


def countup(stop):
    x = 0
    while x < stop:
        print('Up', x)
        time.sleep(1)
        x += 1


countdown(5)
countup(5)

顺序执行会依次执行两个函数,执行过程是同步的,输出内容如下:

Down 5
Down 4
Down 3
Down 2
Down 1
Up 0
Up 1
Up 2
Up 3
Up 4

二、并发执行

接下来,看下并发执行,并发执行的经典解决方案是 多线程,代码如下:

import time
import threading


def countdown(n):
    while n > 0:
        print('Down', n)
        time.sleep(1)
        n -= 1


def countup(stop):
    x = 0
    while x < stop:
        print('Up', x)
        time.sleep(1)
        x += 1


threading.Thread(target=countdown, args=(5,)).start()
threading.Thread(target=countup, args=(5,)).start()

当用两个线程执行上面的两个函数,输出内容就不能保证顺序了,并且Up和Down还可能连在一起,这是因为print函数是线程不安全的,输出如下:

Down 5
Up 0
UpDown 1
 4
UpDown 2
 3
Up 3
Down 2
UpDown  41

那么,有没有办法不用多线程,就能实现并发?答案是有的。

三、引入调度器

要在单个线程中实现并发,就需要让countdown和countup两个函数轮替执行,于是就引入调度器,这个调度器的作用类似Python里的事件循环:

import time
from collections import deque


class Scheduler:
    def __init__(self):
        self.ready = deque()  # 创建一个双向队列,存放待执行的函数

    def call_soon(self, func):
        self.ready.append(func)  # 从右侧添加函数

    def run(self):
        while self.ready:
            func = self.ready.popleft()  # 从左侧弹出
            func()  # 执行函数


sched = Scheduler()  # 调度器对象


def countdown(n):
    if n > 0:
        print('Down', n)
        time.sleep(1)  # 改成5,将会阻塞5秒,什么也做不了
        sched.call_soon(lambda: countdown(n - 1))  # 把 n -= 1 修改成这行,放入待执行队列


def countup(stop):
    def _run(x):  # 使用嵌套函数能达到一样的效果
        if x < stop:
            print('Up', x)
            time.sleep(1)
            sched.call_soon(lambda: _run(x + 1))
    _run(0)


sched.call_soon(lambda: countdown(5))  # 两个函数依次放入队列
sched.call_soon(lambda: countup(5))
sched.run()  # 依次执行 countdown,countup

上面的代码,用一个双向队列,已经实现了一个简单的调度器,countdown和countup会依次执行。但是,这样的调度器只是简单的轮替执行,并没有同时执行的效果,输出如下:

Down 5
Up 0
Down 4
Up 1
Down 3
Up 2
Down 2
Up 3
Down 1
Up 4

上面的调度器还有个问题,比如把countdown里的sleep时间改成5秒,那么这个调度器在轮替到countdown时,会等待5秒,一直卡在那里,所以是一种资源浪费,因为卡5秒的时间里,什么事也干不了。

于是,我们开始优化,让sleep函数延后执行,而不是立刻阻塞,这次引入 call_later 方法:

import time
from collections import deque


class Scheduler:
    def __init__(self):
        self.ready = deque()  # 创建一个双向队列,存放待执行的函数
        self.sleeping = []  # 列表当成队列使用,存放需要sleep的函数

    def call_soon(self, func):
        self.ready.append(func)  # 从右侧添加函数

    def call_later(self, delay, func):
        deadline = time.time() + delay  # 过期时间
        self.sleeping.append((deadline, func))
        self.sleeping.sort()  # 过期时间升序排序,最快过期的在最左侧,其实就是一个优先队列

    def run(self):  # 调度器主函数
        while self.ready or self.sleeping:
            if not self.ready:
                # 如果待执行队列为空,就去sleep队列拿出最先到期并待执行的函数
                # 拿出最先到期并待执行函数的目的是尽可能减少等待时间,提高并发的效率
                deadline, func = self.sleeping.pop(0)
                deta = deadline - time.time()
                if deta > 0:
                    time.sleep(deta)
                self.ready.append(func)

            while self.ready:
                func = self.ready.popleft()  # 从左侧弹出
                func()  # 执行函数


sched = Scheduler()  # 调度器对象


def countdown(n):
    if n > 0:
        print('Down', n)
        # time.sleep(4)

        # 因为没有暂停/恢复能力,用递归调用来间接地实现“恢复执行”的能力,
        # 下文将会用yield实现 暂停/恢复 能力
        sched.call_later(4, lambda: countdown(n - 1))  


def countup(stop):
    def _run(x):  # 使用嵌套函数能达到一样的效果
        if x < stop:
            print('Up', x)
            # time.sleep(1)
            sched.call_later(1, lambda: _run(x + 1))
    _run(0)


sched.call_soon(lambda: countdown(5))  # 两个函数依次放入队列
sched.call_soon(lambda: countup(20))
sched.run()  # 依次执行 countdown,countup

此时的输出,看着已经是并发执行了,一个简单的异步框架的雏形就出来了!

看到了吗,并发执行的关键是 Scheduler 这个类,它使用ready和sleeping两个队列共同实现轮换执行,真正的调度者是 Scheduler.run 方法,输出如下:

Down 5
Up 0
Up 1
Up 2
Up 3
Down 4
Up 4
Up 5
Up 6
Up 7
Down 3
Up 8
Up 9
Up 10
Up 11
Down 2
Up 12
Up 13
Up 14
Up 15
Down 1
Up 16
Up 17
Up 18
Up 19

下面,用优先队列代替列表排序,并用序列号解决deadline值相同时会对函数排序造成的异常(输出结果同上):

import time
import heapq
from collections import deque


class Scheduler:
    def __init__(self):
        self.ready = deque()  # 创建一个双向队列,存放待执行的函数
        self.sleeping = []  # 列表当成队列使用,存放需要sleep的函数
        self.sequence = 0

    def call_soon(self, func):
        self.ready.append(func)  # 从右侧添加函数

    def call_later(self, delay, func):
        self.sequence += 1  # 防止deadline相同时,对函数排序而报错
        deadline = time.time() + delay  # 过期时间
        # 使用优先队列 代替每次对列表排序
        heapq.heappush(self.sleeping, (deadline, self.sequence, func))

    def run(self):
        while self.ready or self.sleeping:
            if not self.ready:
                # 如果待执行队列为空,就去sleep队列拿出最先到期并待执行的函数
                # 拿出最先到期并待执行函数的目的是尽可能减少等待时间,提高并发的效率
                deadline, _, func = heapq.heappop(self.sleeping)
                deta = deadline - time.time()
                if deta > 0:
                    time.sleep(deta)
                self.ready.append(func)

            while self.ready:
                func = self.ready.popleft()  # 从左侧弹出
                func()  # 执行函数


sched = Scheduler()  # 调度器对象


def countdown(n):
    if n > 0:
        print('Down', n)
        # time.sleep(4)
        sched.call_later(4, lambda: countdown(n - 1))


def countup(stop):
    def _run(x):  # 使用嵌套函数能达到一样的效果
        if x < stop:
            print('Up', x)
            # time.sleep(1)
            sched.call_later(1, lambda: _run(x + 1))
    _run(0)


sched.call_soon(lambda: countdown(5))  # 两个函数依次放入队列
sched.call_soon(lambda: countup(20))
sched.run()  # 依次执行 countdown,countup

四、异步队列的实现

现在,我们在调度器的基础上,自己实现一个异步队列,整合到上文的源码中,文件名aproducer.py。

Scheduler使用两个队列来实现函数的并发执行,ready队列存放待执行的函数,sleeping队列让sleep函数延后执行,然后通过Scheduler.run来调度函数的执行。

AsyncQueue也使用两个队列来实现异步生产-消费模型,items队列存放生产消费的数据,waiting队列实现异步非阻塞地消费数据(通过回调函数)。同时,异步队列还实现了一个close方法,来告知生产者、消费者队列是否关闭。

# aproducer.py

import time
import heapq
from collections import deque


class Scheduler:
    def __init__(self):
        self.ready = deque()  # 创建一个双向队列,存放待执行的函数
        self.sleeping = []  # 列表当成队列使用,存放需要延迟的函数
        self.sequence = 0

    # 立即执行的函数入待执行队列
    def call_soon(self, func):
        self.ready.append(func)  # 从右侧添加函数

    # 等待执行的函数入等待队列
    def call_later(self, delay, func):
        self.sequence += 1  # 防止deadline相同时,对函数排序而报错
        deadline = time.time() + delay  # 过期时间
        # 使用优先队列 代替每次对列表排序
        heapq.heappush(self.sleeping, (deadline, self.sequence, func))

    def run(self):
        while self.ready or self.sleeping:
            if not self.ready:
                # 如果待执行队列为空,就去sleep队列拿出最先到期并待执行的函数
                # 这样做的目的是尽可能减少等待时间,提高并发的效率
                deadline, _, func = heapq.heappop(self.sleeping)
                deta = deadline - time.time()
                if deta > 0:
                    time.sleep(deta)
                self.ready.append(func)

            while self.ready:
                func = self.ready.popleft()  # 从左侧弹出
                func()  # 执行函数


sched = Scheduler()  # 调度器对象


class Result:
    def __init__(self, value=None, exc=None):
        self.value = value
        self.exc = exc

    def result(self):
        if self.exc:
            raise self.exc
        else:
            return self.value


class QueueClosed(Exception):
    pass


class AsyncQueue:
    def __init__(self):
        self.items = deque()
        self.waiting = deque()
        self._closed = False

    def close(self):
        self._closed = True
        if self.waiting and not self.items:
            for func in self.waiting:
                sched.call_soon(func)

    def put(self, item):
        if self._closed:
            raise QueueClosed()

        self.items.append(item)
        if self.waiting:
            func = self.waiting.popleft()
            # func可能会递归调用,因此不要立即执行,同样由调度器来执行
            sched.call_soon(func)

    def get(self, callback):
        if self.items:
            callback(Result(value=self.items.popleft()))
        else:  # 如果队列为空,把get放入等待队列,等队列非空时执行
            if self._closed:
                callback(Result(exc=QueueClosed()))
            else:
                self.waiting.append(lambda: self.get(callback))


def producer(q, count):
    def _run(n):
        if n < count:
            print('Producing', n)
            q.put(n)
            sched.call_later(1, lambda: _run(n + 1))
        else:
            print('Producer done')
            q.close()
    _run(0)


def consumer(q):
    def _consum(result):
        try:
            item = result.result()
            print('Consuming', item)
            sched.call_soon(lambda: consumer(q))
        except QueueClosed:
            print('Consumer done')
    q.get(callback=_consum)


q = AsyncQueue()
sched.call_soon(lambda: producer(q, 10))
sched.call_soon(lambda: consumer(q))
sched.run()

输出内容如下:

Producing 0
Consuming 0
Producing 1
Consuming 1
Producing 2
Consuming 2
Producing 3
Consuming 3
Producing 4
Consuming 4
Producing 5
Consuming 5
Producing 6
Consuming 6
Producing 7
Consuming 7
Producing 8
Consuming 8
Producing 9
Consuming 9
Producer done
Consumer done

五、引入yield

上文实现的调度器,本质上也是顺序执行的,只是效果上出现了不同函数之间切换执行。

这次,我们引入yield,这样我们就获得了 暂停/恢复 代码执行的能力。由此,我们可以实现一个基于协程的调度器,文件名yieldo.py。

# yieldo.py

import time
import heapq
from collections import deque


# 实现一个可等待类,利用yield的可暂停/恢复特性,给switch函数切换任务使用
class Awaitable:
    def __await__(self):
        yield  # yield的作用类似中断,让代码在此处暂停,暂时交出执行权,稍后再回来恢复


def switch():
    # 返回可等待对象,使用 await switch() 会调用 __await__,在 yield 处暂停
    return Awaitable()


class Scheduler:
    def __init__(self):
        self.ready = deque()  # 待执行队列
        self.sleeping = []  # 等待队列
        self.current = None  # 当前正在执行的协程函数
        self.sequence = 0  # 优先队列的第二个排序列

    async def sleep(self, delay):
        deadline = time.time() + delay
        self.sequence += 1
        heapq.heappush(self.sleeping, (deadline, self.sequence, self.current))
        self.current = None  # 置为空,则current不再入ready队列

        # await配合Awaitable实例对象,
        # 将会停在Awaitable里的yield,暂时退出函数,直到用send(None)恢复
        # switch函数是为了把细节封装起来,也可以直接 await Awaitable()
        await switch()

    def new_task(self, coro):
        self.ready.append(coro)

    def run(self):
        while self.ready or self.sleeping:
            if not self.ready:
                # 从优先队列(最小堆)弹出等待时间最少的待执行函数
                deadline, _, coro = heapq.heappop(self.sleeping)
                delta = deadline - time.time()
                if delta > 0:
                    time.sleep(delta)  # 真正的阻塞等待
                self.ready.append(coro)

            self.current = self.ready.popleft()
            try:
                # 调用协程的send方法,跟触发一个生成器函数的效果一样,
                # 第一次send则开始执行协程函数,并停在调用链路最底下的yield处,
                # 后面执行send则从yield处恢复,继续执行,该返回(return)则返回
                self.current.send(None)

                if self.current:
                    self.ready.append(self.current)
            except StopIteration:
                # Awaitable实例对象暂停一次后,第二次从暂停处恢复,并引发停止迭代异常,
                # 使__await__执行结束,switch函数得以返回,达到暂停/恢复的目的
                pass


sched = Scheduler()


async def countdown(n):
    while n > 0:
        print('Down', n)
        await sched.sleep(4)  # 把 time.sleep(4) 换成这行,达到类似 asyncio.sleep 的效果
        n -= 自己手写调度器,理解Python中的asyncio异步事件循环与协程

深入理解 python3.4 中 Asyncio 库与 Node.js 的异步 IO 机制

Python asyncio ensure_future 装饰器

python协程(4):asyncio

Python中的协程与asyncio原理

Python中的协程与asyncio原理