python3.6异步IO包asyncio部分核心源码思路梳理

Posted olivertian

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python3.6异步IO包asyncio部分核心源码思路梳理相关的知识,希望对你有一定的参考价值。

关于python异步编程的演进过程,两篇文章阐述得妥妥当当,明明白白。

中文资料:https://mp.weixin.qq.com/s?__biz=MzIxMjY5NTE0MA==&mid=2247483720&idx=1&sn=f016c06ddd17765fd50b705fed64429c

英文资料:http://aosabook.org/en/500L/a-web-crawler-with-asyncio-coroutines.html

其实中文资料就是参考的英文资料,英文资料是开源书《500 Lines or Less》中的一个主题章节,整书地址:https://github.com/aosabook/500lines

 

python的asyncio源码的核心思路其实跟基于生成器的协程异步编程思路大体一致,只是前者做了大量的代码优化和功能扩充。所以对照生成器协程代码来理解asyncio是很有帮助的。以下的这一小段代码就是采用基于生成器的协程的异步编程方式写的一个小爬虫案例,来自上述中文资料,asyncio的核心代码的思路大体上能从这段代码中找到原型。

该脚本命名为:yield_from.py

import socket
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stopped = False
urls_todo = "/", "/1", "/2", "/3"

class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

    def __iter__(self):
        """
        yield的出现使得__iter__函数变成一个生成器,生成器本身就有next方法,所以不需要额外实现。
        yield from x语句首先调用iter(x)获取一个迭代器(生成器也是迭代器)
        """
        yield self  # 外面使用yield from把f实例本身返回
        return self.result  # 在Task.step中send(result)的时候再次调用这个生成器,但是此时会抛出stopInteration异常,并且把self.result返回

def connect(sock, address):
    f = Future()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        f.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from f
    selector.unregister(sock.fileno())

def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    """
    此处的chunck接收的是f中return的f.result,同时会跑出一个stopIteration的异常,只不过被yield from处理了。
    这里也可直接写成chunck = yiled f
    """
    chunck = yield from f
    selector.unregister(sock.fileno())
    return chunck

def read_all(sock):
    response = []
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)
    return b"".join(response)

class Crawler:
    def __init__(self, url):
        self.url = url
        self.response = b""

    def fetch(self):
        global stopped
        sock = socket.socket()
        yield from connect(sock, ("xkcd.com", 80))
        get = "GET 0 HTTP/1.0\\r\\nHost:xkcd.com\\r\\n\\r\\n".format(self.url)
        sock.send(get.encode(ascii))
        self.response = yield from read_all(sock)
        print(self.response)
        urls_todo.remove(self.url)
        if not urls_todo:
            stopped = True

class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)  # 激活Task包裹的生成器

    def step(self, future):
        try:
            # next_future = self.coro.send(future.result)
            next_future = self.coro.send(None)  # 驱动future
        except StopIteration:
            return

        next_future.add_done_callback(self.step)

def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

if __name__ == "__main__":
    import time
    start = time.time()
    for url in urls_todo:
        crawler = Crawler(url)
        Task(crawler.fetch())
    loop()
    print(time.time() - start)

 

下面用asyncio写一个简单的案例,通过对照上述基于生成器协程的代码来梳理asyncio源码的逻辑。

基于asyncio编写的小demo,该脚本命名为:asyncio_test.py

import asyncio
import time

async def get_html(url):
    print("start get url")
    await asyncio.sleep(3)
    await asyncio.sleep(3)
    print("end get url")

if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()

    cora = get_html("http://www.baidu.com")
    task = loop.create_task(cora)
    loop.run_until_complete(task)

    # tasks = [get_html("http://www.baidu.com") for i in range(10)]
    # loop.run_until_complete(asyncio.wait(tasks))
    print(time.time() - start_time)

asyncio源码包位置在python安装目录下的Lib/asyncio下。

首先是loop = asyncio.get_event_loop(),光是看名字就知道和yield_from.py中的loop有不可告人的关系,其实这个loop就是获取一个事件循环,不停地循环检测是否有事件准备好,如果有,则立刻调用注册在事件上的回调函数,直到stopped置位才退出循环。get_event_loop()返回一个事件循环类,该类会继承BaseEventLoop,BaseEventLoop才是分析的重点,因为很多重要函数的具体实现在这个类中,BaseEventLoop位于asyncio/base_events.py中。

一、create_task()

接下来看asyncio_test.py代码中的task = loop.create_task()都干了嘛。

在asyncio/base_events.py中找到BaseEventLoop定义,在BaseEventLoop类中有个create_task方法,看看这个方法的的代码

    def create_task(self, coro):  
        self._check_closed()
        if self._task_factory is None:  
            task = tasks.Task(coro, loop=self)
            if task._source_traceback:
                del task._source_traceback[-1]
        else:
            task = self._task_factory(self, coro)
        return task

这个函数接收一个协程作为参数,其核心代码其实就如下两句,新建一个Task类实例并返回。

def create_task(self, coro):    
        task = tasks.Task(coro, loop=self)
        return task

Task类的定义在asyncio/tasks.py中,这是理解asyncio逻辑的一个比较关键的类,上面新建Task类实例时,传入的coro就是通过这个类中实现的逻辑来一步一步驱动的,这个Task类看起来挺复杂的,其实和上面yield_from.py中的Task类的作用很相似,对照着分析还是不难的。

二、Task类

先来简单捋一捋yield_from.py中Task类的逻辑。

在yield_from.py中Task类实例都会包裹一个协程(即self.coro),然后通过step函数中的send方法来驱动包裹在实例中的协程。协程可以理解为由多个future组合而成,一个future完成了才能让下一个future上场,future翻译为未来对象,要怎么理解这个呢?想了半天没想出个好的生活场景,那就粗糙一点吧,想象一下食堂排队打饭,每个排队的都是一个future,只有当前排到的future打完了才能yield from下一个future,而整个队伍就是一个协程,只有整个队伍都打完了协程才算结束,这实在很牵强,不过意思到了就差不多吧,看懂代码才能有精准的理解,这里只是一个大概意思,辅助更快理解代码逻辑。画了一张图表达一下心意,后面还会有future代码的分析。

技术图片

 

 

 即,Task类中包裹一个协程,协程可理解为包含多个future类,多个future按顺序来执行,一个future死了,下一个才能活。

在step中yield from回来的next_future,又会把当前实例的step方法注册为其回调函数,所以每个future结束,都会调用step方法,以此来激活下一个future,不断推进协程向下执行,直到没有future了,就抛出StopIteration,来结束该Task。

其实asyncio中的Task类跟这个逻辑是一样的,首先,它也会包裹一个协程,再去看下create_task的时候,是不是传入了一个coro参数,在Task类的__init__函数中有个self._coro 来接收保存这个协程。另外,Task类中有一个_step函数,其作用就类似于yield_from.py中的step函数。找到_step函数

技术图片

 

 画框这里是不是和yield_from.py中的step函数中的很像,没错,是他是他就是他,很类似的逻辑,只是_step中多了很多参数检查和其他一下异常检查等,如果我们只关注核心逻辑,那些都可以暂时忽略掉。

可类似归类似,差别还是有的:

在yield_from.py中Task实例一创建,在Task实例的__init__函数中就会马上调用self.step()函数来激活被其包裹的协程,那asyncio中的Task实例是如何激活它的协程的呢?

其实答案也在__init__函数中,只不过这里不是马上就调用_step函数,而是会在下一帧(一次循环就是一帧)循环的时候调用,来看下Task类的__init__函数

技术图片

 

 这里调用call_soon函数来把该Task实例的_step函数添加到待执行队列中,call_soon函数也是定义在asyncio/base_events.py的BaseEventLoop类中的,点过去看下,其中代码对我们本次分析而言,有用的就一句:handle = self._call_soon(callback, args),调用_call_soon函数把callback注册到待执行队列中。往下翻,就能看到self._call_soon:

    def _call_soon(self, callback, args):
        handle = events.Handle(callback, args, self)
        if handle._source_traceback:
            del handle._source_traceback[-1]
        self._ready.append(handle)  # 事件添加到队列
        return handle

很容易理解,就是返回一个Handle类,然后把它添加到self._ready队列中,这里稍微解释一下Handle类和self._ready队列的作用,点到Handle类的定义很容易就知道,它其实就是包裹了就绪事件的回调函数的,其中定义了一个run方法,就是直接执行回调函数,而self._ready保存着Handle类的实例,我们由yield_from.py中可以知道,有个loop死循环不断检测是否有事件就绪,一旦有就绪事件,就调用其回调函数,在asyncio中当然也是有这种死循环的,后面会讲到,这个循环也是不断检测self._ready是否有为空,不为空就从其中弹出Handle实例,然后调用handle实例的run方法,说白了就是执行注册在就绪事件上的回调函数。

接着回到Task类。

Task实例初始化时,就通过call_soon把self._step添加到_ready队列中,所以下一轮循环中会从_ready中弹出包裹_step函数的handle,然后执行_step,这样就实现了激活task包裹的协程。这样来看,和yield_from.py中的Task类是不是很类似,只是一个在初始化时立马执行step函数,一个是在下一轮循环中执行_step,其实也没啥区别。

现在在asyncio_test.py中,已经通过task = loop.create_task(cora)创建了一个task实例,该task实例包裹了我们自己定义的协程cora,并且在task初始化的时候在__init__函数中通过call_soon通知下一次循环立即执行task的_step函数来激活cora协程。接下来就是run_until_complete函数登场了

三、run_until_complete

这个函数同样是定义在asyncio/base_events.py的BaseEventLoop类中。在这个函数中,我们上述提到的死循环(类比yield_from.py中的loop())就要闪亮登场了,点到函数定义处(把不妨碍本次说明的部分代码删除了)

    def run_until_complete(self, future):
        future = tasks.ensure_future(future, loop=self)  # ensure_future,即,确保是future。返回的是future(task也是future)

        future.add_done_callback(_run_until_complete_cb)  # 用来结束循环
        try:
            self.run_forever()
        except:
            if new_task and future.done() and not future.cancelled():
                future.exception()
            raise
        finally:
            future.remove_done_callback(_run_until_complete_cb)
        if not future.done():
            raise RuntimeError(Event loop stopped before Future completed.)

        return future.result()

首先调用ensure_future来确保传进来的future参数是个future,我们之前说过Task是继承自Future的,所以task也是future,而我们外面传进来的参数是个task实例,所以这个函数调用返回的其实就是本身(传进去是啥返回就是啥),然后给我们传进来的task实例通过调用add_done_callback添加_run_until_complete_cb回调函数,这个回调函数比较关键,run_until_complete的做的最重要的事就是给传进来的task实例添加这个回调,点进_run_until_complete_cb,可以看到就是调用了loop的stop函数,这个的意义就是,当我们传进来的task包裹的协程运行结束后,就调用这个回调,跳出循环(就是相当于yield_from.py中的stopped变量的作用),否则死循环就真的是死循环了,永远跳不出。

然后就是调用run_forever,死循环正式登场

四、run_forever

这个函数前面一长串,但是现在,我们统统都不看,只看关键地方,删除无关代码后,只留下:

    def run_forever(self):try:
            events._set_running_loop(self)
            while True:
                self._run_once()
                if self._stopping:
                    break
        finally:
            ...

是不是看起来和yield_from.py中的loop函数像极了,这个循环不断地调用_run_once(),就像yield_from.py的loop函数中不断地调用下面这段代码:

events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

所以推测,_run_once()中是不是真的就是实现上述代码的逻辑呢?没错的,是他是他就是他。

点进_run_once()看一下,这个函数的代码量有点多了,主要是这里面还实现了一个定时功能(asyncio.sleep()),关于这个功能不展开了,其实也很简单,主要还是抓住我们的主线来讲,我们来看_run_once()中是如何实现上述代码段的逻辑的,注意到其中有这样两行代码:

        else:
            event_list = self._selector.select(timeout)  # 筛选就绪事件,将其回调添加到self._ready中
        self._process_events(event_list)  # 该函数具体实现在selector_events.py中

这里的event_list = self._selector.select(timeout)和上述的events = selector.select()是不是很相似?这里也就是选出就绪事件,然后添加到self._ready队列中,随后执行,之前已经解释过self._ready队列的作用了,马上就要登场的就是调用就绪事件的回调函数的执行,在_run_once()的尾部,我们看到如下代码:

ntodo = len(self._ready)
        for i in range(ntodo):
            handle = self._ready.popleft()
            if handle._cancelled:
                continue
            if self._debug:
                try:
                    self._current_handle = handle
                    t0 = self.time()
                    handle._run()
                    dt = self.time() - t0
                    if dt >= self.slow_callback_duration:
                        logger.warning(Executing %s took %.3f seconds,
                                       _format_handle(handle), dt)
                finally:
                    self._current_handle = None
            else:
                handle._run()
        handle = None  # Needed to break cycles when an exception occurs.

这就是之前解释过的self._ready的作用,先看_ready队列中是否有待处理的Handle实例,如果有,那就一个一个执行,handle中的_run()方法就是执行就绪事件的回调函数。

至此,就把yield_from.py中的loop()函数的逻辑对应到了asyncio源码的循环之中。

接下来,就来看看那个难以理解的Future类是怎么回四

五、Future类

首先看看yield_from.py中的Future类是怎么回事,就知道asyncio中的Future是怎么回事了,他们长得都很像。

    def __iter__(self):
        """
        yield的出现使得__iter__函数变成一个生成器,生成器本身就有next方法,所以不需要额外实现。
        yield from x语句首先调用iter(x)获取一个迭代器(生成器也是迭代器)
        """
        yield self  # 外面使用yield from把f实例本身返回
        return self.result  # 在Task.step中send(result)的时候再次调用这个生成器,但是此时会抛出stopInteration异常,并且把self.result返回

Future类最关键也是最难理解的就是__iter__方法,__iter__中的yield的出现,使得__iter__变成一个生成器。

再通过yield_from.py中具体例子来梳理一下future的使用方法。

    def fetch(self):
        global stopped
        sock = socket.socket()
        yield from connect(sock, ("xkcd.com", 80))
        get = "GET 0 HTTP/1.0\\r\\nHost:xkcd.com\\r\\n\\r\\n".format(self.url)
        sock.send(get.encode(ascii))
        self.response = yield from read_all(sock)
        print(self.response)
        urls_todo.remove(self.url)
        if not urls_todo:
            stopped = True

假设代码已经执行到yield from read_all(sock)这一行,yield from的出现使得调用fetch生成器的调用方(即step中的send方法)和read_all()建立了一个直通的通道,数据流由send直接传到read_all,再来read_all中看看,read_all中也有yield from read(sock),这个yield from让send和read函数之间又建立了一个直连通道,再看看read函数有什么,read中又有一个yield from f,这次直接让send和f建立了一个直连通道, f中的__iter__就只有一个yield了,不再有yield from,所以通道终于到头了,于是整个下来就是send和f的__iter__生成器建立了一个直连通道,当上一个future执行完毕,调用其回调函数(即step函数)时,就会用send发送一个none。。。啊,讲不清楚了,暴毙而亡,这逻辑太难表述了,如果把yield from句法弄清楚了,然后多看几遍yield_from.py的代码,反复研究,应该就能明白的,其实也不难理解,就是太难表述了,生成器一层嵌套一层。。。

把yield_from.py中的future搞懂了,再看asyncio中的future,发现其实结构是一样的,功能也类似。

先写到这里,已经凌晨三点,睡~

 

以上是关于python3.6异步IO包asyncio部分核心源码思路梳理的主要内容,如果未能解决你的问题,请参考以下文章

Python异步IO之协程:使用asyncio的不同方法实现协程

asyncio

转-python异步IO-asyncio

Python并发编程之初识异步IO框架:asyncio 上篇

Python学习---IO的异步[asyncio +aiohttp模块]

Python黑魔法 --- 异步IO( asyncio) 协程