异步编程学习

Posted czaorz

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步编程学习相关的知识,希望对你有一定的参考价值。

原文章来源于: 
https://mp.weixin.qq.com/s?__biz=MzIxMjY5NTE0MA==&mid=2247483720&idx=1&sn=f016c06ddd17765fd50b705fed64429c
原项目GitHub:
https://github.com/denglj/aiotutorial

原文章写得很精彩,但有些代码还是可以优化下的。而且这文章一直只有上篇,可惜了。

接下来按个人见解,从代码角度解析下这篇文章:

前提知识讲解:
1、计算机资源:常分为CPU资源、内存资源、硬盘资源和网络资源
2、进程阻塞:正在运行的程序,由于自身某个模块需要使用硬盘或网络I/O资源等,而系统又未及时响应,导致进程处于待机状态,直至等待事件作出回应后才会被唤醒。
3、进程非阻塞:同理,在获取某些资源时,不会等待结果响应,而是继续处理其他模块。
我们以socket为例,如下可获取阻塞与非阻塞两种编程
import socket
sock = socket.socket()

socket.setblocking(True)
# 默认就是阻塞。即套接字 建立连接/发送请求/接受请求 的时候,是阻塞的。
socket.setblocking(False) # 设置为非阻塞,即上述请求过程不会阻塞,而是继续处理其他模块。

 

使用原生asyncio编写异步程序:

在此代码中,我们需要注意几个关键点

1、loop = asyncio.get_event_loop()  # 开启事件循环,异步"任务"将在此循环执行

2、asyncio.create_task()  # 将一个协程包装成一个"任务"排入日程准备执行

3、asyncio.gather()  # 同步执行"任务"

import asyncio
import aiohttp
import time

loop = asyncio.get_event_loop()

async def fetch():
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get(http://www.baidu.com) as response:
            print(await response.read())

async def multi_fetch():
    await asyncio.gather(*[asyncio.create_task(fetch()) for _ in range(10)])

if __name__ == __main__:
    start = time.time()
    loop.run_until_complete(fetch())  # 执行一次
    # loop.run_until_complete(multi_fetch())  # 执行十次
    print(time.time() - start)

 

接下来我们就来一步一步的实现上述几个关键点,实现手写自己的异步程序

首先得实现一个阻塞程序,以socket为例。此例子比较简单,大致看一下即可。

用时的话,我们可以明显看出blocking_socket() 用时大致0.7s

而multi_blocking_socket() 用时大致0.7s,刚好是10倍左右。

import socket
import time

def blocking_socket(response=b‘‘):
    sock = socket.socket()  # 默认为阻塞连接
    sock.connect((www.baidu.com, 80))  # 建立百度TCP连接
    sock.send(bGET / HTTP/1.0

)  # 发送HTTP协议
    chunk = sock.recv(1024)  # 接收数据
    while chunk:
        response += chunk
        chunk = sock.recv(1024)
    return response

def mutil_blocking_socket():
    return [blocking_socket() for _ in range(10)].__len__()

if __name__ == __main__:
    start = time.time()
    blocking_socket()  # 执行一次
    # mutil_blocking_socket()  # 执行十次
    print(time.time() - start)

 

接着我们来实现非阻塞程序,仍然使用socket来实现

来比较时间,我们会惊讶的发现,no_blocking_socket() 执行一次大致0.07s

而multi_no_blocking_socket() 执行十次,大致是0.7s....

没错,花费时间和阻塞编程是一样的。我们的非阻塞编程并没有达到实际的效果。

import socket
import time

def no_blocking_socket(response=b‘‘):
    sock = socket.socket()
    sock.setblocking(False)  # 设置非阻塞连接
    try:
        sock.connect((www.baidu.com, 80))
    except BlockingIOError:  # 非阻塞式建立连接在此处会报错,捕获忽略即可
        pass

    while True:
        try:
            sock.send(bGET / HTTP/1.0

)  # 发送HTTP协议
            break
        except OSError:  # 此处会报错。因为套接字与百度服务器的TCP连接还没有建立,
# 但是套接字却是非阻塞的,故在未建立连接的情况下发送协议会报错,此处捕获此异常
continue while True: try: chunk = sock.recv(1024) while chunk: response += chunk chunk = sock.recv(1024) return response except OSError: # 同理,对方服务器未接收HTTP协议,不会返回数据,即使返回,也有时延。故此时接收数据会报异常,此处捕获此异常 pass def multi_no_blocking_socket(): return [no_blocking_socket() for _ in range(10)].__len__() if __name__ == __main__: start = time.time() no_blocking_socket() # 执行一次 # multi_no_blocking_socket() # 执行十次 print(time.time() - start)

我们来分析一下,为什么会造成这种现象。我们发现这套非阻塞代码与之前的阻塞代码相比,

多了三处 try: ... except: ...

多了两处 while True: ...

代码确实是非阻塞编程,也就是程序不会在网络IO模块处阻塞,但是程序也没有把空闲下来的时间花在"正确"的地方

在非阻塞的情况下,CUP把空闲下来的时间不停的去"试错",也就是程序会不停的尝试发送协议,直到发送成功。不停的尝试接收数据,直到接收成功。

这样就导致我们的非阻塞编程,实际上和阻塞编程是一样的,唯一要说不同。就是非阻塞程序中的CPU可能会 "忙一点"。

 

那么我们可以注意到了问题的关键,就是我们不知道什么时候程序是 "准备就绪了",也就是什么时候可以发送协议,什么时候可以接收数据。

其实操作系统已经帮我们实现了,它将事件的 I/O 都封装成了事件,如可读事件,可写事件。那么我们就可以立即想到,当我们的套接字状态变为我们所需要的时候,

就立即执行接下来的步骤。所以此处,我们就需要 "回调"!

这里我们说明下python中selectors模块,用于注册事件的回调

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

sock = socket.socket()  # 获取套接字
selector
= DefaultSelector() # 获取selector对象 selector.register(sock.fileno(), EVENT_WRITE, on_send) # fileno()获取当前socket套接字的文件描述符,并绑定事件EVENT_WRITE,回调函数为on_send selector.unregister(sock.fileno()) # 注销事件绑定 selector.register(sock.fileno(), EVENT_READ, on_recv) # 同理,绑定事件EVENT_READ,回调函数为on_recv

while True:
  events = selector.select() # 此处的events是操作系统返回的事件,也就是我们绑定的事件被触发了,
# 此处是阻塞获取的。也就是用一个事件循环的阻塞,来代替我们的while True: ...
  for sock, mask in events:
    sock.data() #
sock.data为绑定的回调函数,也就是上面的on_send和on_recv

如此我们就不需要自己手动while True来监控事件状态的改变,将这件工作交给事件循环。此处我们将其命名为loop。

 

接下来就是回调编程了

import socket
import time

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stop_loop = 10

class Crawler:
    def __init__(self, flag=10):
        self.flag = flag
        self.sock = None
        self.response = b‘‘

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect((www.baidu.com, 80))
        except BlockingIOError:
            pass
    # fileno()获取当前socket套接字的文件描述符,并绑定事件回调
    selector.register(self.sock.fileno(), EVENT_WRITE, self.on_send)
    def on_send(self):
        selector.unregister(self.sock.fileno())
        self.sock.send(bGET / HTTP/1.0

)
        selector.register(self.sock.fileno(), EVENT_READ, self.on_recv)

    def on_recv(self):
        chunk = self.sock.recv(1024)
        if chunk:
            self.response += chunk
        else:
            global stop_loop
            stop_loop -= self.flag
            selector.unregister(self.sock.fileno())

def loop():  # 事件循环,由操作系统返回那个事件发生了,对应执行那些事件的回调。
    while stop_loop:
        events = selector.select()
        for sock, mask in events:
            sock.data()

if __name__ == __main__:
    start = time.time()
    Crawler(10).fetch()  # 执行一次
    # [Crawler(1).fetch() for _ in range(10)]  # 执行十次
    loop()
    print(time.time() - start)

我们来捋一下代码执行的流程:

1、首先实例化一个Crawler对象,然后执行此实例的fetch方法

2、fetch方法发起了与百度服务器的连接,然后注册了回调函数。

3、此时会走到loop()函数来,执行事件循环。直到我们与对方服务器的连接建立成功,则此时OS会返回事件,我们则执行对应的回调事件 sock.data()  <=> self.on_send()

4、执行on_recv,首先注销上一个事件,然后发送协议,再接着注册可读事件。继续进入等待

5、此时继续进入loop事件循环,直到触发注册事件,执行回调函数on_recv,由于一次只接收1024,故可能会接收多次,也就是会触发多次on_recv事件的回调,直接接收完成。

6、接收完成,我们令全局变量stop_loop - flag,来停止loop事件循环。程序结束。

然后我们来看下时间,我们会发现执行一次的时间和执行十次的时间基本是差不多的。说明我们编写的程序是没有问题的。

回调式异步编程,成功!

 

至此,我们已经实现了回调式异步编程,但是我们思考下第一个例子,是基于协程的异步编程,故我们现在来调整代码,编写协程。

import socket
import time

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stop_loop = 10

class Future:
    # 用于存放未来可能出现的数据,当出现时执行一次回调函数
    # 此中的result仅作为一个中转,实际还是通过回调返回给协程

    def __init__(self):
        self.result = None
        self.callback = None

    def set_callback(self, func):
        self.callback = func

    def set_result(self, result):
        self.result = result
        self.callback(self) if self.callback else None

class Task:
    # 用于启动协程,该类实例初始化时传入为协程对象,执行self.process方法
    # 调用协程的send方法,启动协程,并最后绑定回调函数

    def __init__(self, co_routine):
        self.co_routine = co_routine
        future = Future()
        self.process(future)

    def process(self, future):
        try:
            next_future = self.co_routine.send(future.result)
        except StopIteration:
            return
        next_future.set_callback(self.process)

class Crawler:
    def __init__(self, flag=10):
        self.flag = flag
        self.sock = None
        self.response = b‘‘

    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect((www.baidu.com, 80))
        except BlockingIOError:
            pass

        future = Future()

        def _on_send():
            future.set_result(None)

        def _on_recv():
            future.set_result(self.sock.recv(1024))

        selector.register(self.sock.fileno(), EVENT_WRITE, _on_send)
        yield future
        selector.unregister(self.sock.fileno())
        self.sock.send(bGET / HTTP/1.0

)
        selector.register(self.sock.fileno(), EVENT_READ, _on_recv)
        while True:
            chunk = yield future  # 在此处轮询EVENT_READ事件,直至所有数据加载完毕
            if chunk:
                self.response += chunk
            else:
                global stop_loop
                stop_loop -= self.flag
                return self.response

def loop():
    while stop_loop:
        events = selector.select()
        for sock, mask in events:
            sock.data()

if __name__ == __main__:
    start = time.time()
    Task(Crawler(10).fetch())  # 传入协程fetch,使用Task实例化调用协程的send方法来启动协程
    # [Task(Crawler(1).fetch()) for _ in range(10)]  # 同理,启动十个协程任务
    loop()
    print(time.time() - start)

我们继续来整理下代码执行的流程:

1、首先实例化一些Crawler对象,调用调用此实例的fetch函数得到一个协程。此时协程是没有执行的。(协程需要send等触发才会执行)

2、将此协程装于Task用来创建任务实例,在任务中会主动触发协程的send函数来启动协程

3、此时协程已触发,注册事件 selector.register(self.sock.fileno(), EVENT_WRITE, _on_send),然后返回future,此时协程到此暂停

4、返回的future会添加任务的回调函数,也就是self.precess()。而loop也开始了事件轮询,当套接字的文件描述符状态变为可写状态时,触发回调方法_on_send

5、_on_send方法执行Future中的set_result方法,此时在此方法中会调用一次future注册的回调函数,继续触发任务Task中协程的send方法,回到协程上次暂停的状态

6、回来后,首先注销事件EVENT_WRITE。发送HTTP协议请求。再注册事件 selector.register(self.sock.fileno(), EVENT_READ, _on_recv)

7、loop事件轮询,当套接字的文件描述符变为可读状态时,触发回调方法_on_recv

8、_on_recv方法执行Future中的set_result方法,此时在方法中会初始化result为sock.recv(1024)的值,并执行注册的回调函数,将此结果继续传递至协程上回暂停的地方

9、由于一直没有注销事件EVENT_READ,故会一直驱动事件轮询直至结束

10、Task、Future、Crawler、loop这四个就这么神奇的串联在一起了,不可思议的说。

 

不过这样写貌似不台好看,虽然感觉也可以,但是很多模块其实都是可以拆离开的

下面就是一个拆分版本,就不细细的分析流程啦

import socket
import time

from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stop_loop = 10

def fetch(sock):
    sock.setblocking(False)
    try:
        sock.connect((www.baidu.com, 80))
    except BlockingIOError:
        pass

    future = Future()

    def _on_send():
        future.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, _on_send)
    yield from future
    selector.unregister(sock.fileno())
    return future

def read(sock, future, flag, response=b‘‘):
    def _on_recv():
        future.set_result(sock.recv(1024))

    selector.register(sock.fileno(), EVENT_READ, _on_recv)
    chunk = yield from future
    while chunk:
        response += chunk
        chunk = yield from future
    selector.unregister(sock.fileno())
    global stop_loop
    stop_loop -= flag
    return response


def loop():
    while stop_loop:
        events = selector.select()
        for sock, mask in events:
            sock.data()

class Future:
    def __init__(self):
        self.result = None
        self.callback = None

    def set_callback(self, func):
        self.callback = func

    def set_result(self, result):
        self.result = result
        self.callback(self) if self.callback else None

    def __iter__(self):
        yield self
        return self.result

class Task:
    def __init__(self, co_routine):
        self.co_routine = co_routine
        future = Future()
        self.process(future)

    def process(self, future):
        try:
            next_future = self.co_routine.send(future.result)
        except StopIteration:
            return
        next_future.set_callback(self.process)

class Crawler:
    def __init__(self, flag):
        self.flag = flag

    def fetch(self):
        sock = socket.socket()
        future = yield from fetch(sock)
        sock.send(bGET / HTTP/1.0

)
        response = yield from read(sock, future, self.flag)
        print(response)

if __name__ == __main__:
    start = time.time()
    Task(Crawler(10).fetch())
    # [Task(Crawler(1).fetch()) for _ in range(10)]
    loop()
    print(time.time() - start)

代码放到github上了
https://github.com/CzaOrz/ioco/tree/master/open_source_project/%E5%BC%82%E6%AD%A5%E6%95%99%E7%A8%8B%E5%AD%A6%E4%B9%A0/%E5%BC%82%E6%AD%A5%E5%8D%8F%E7%A8%8B

以上是关于异步编程学习的主要内容,如果未能解决你的问题,请参考以下文章

究竟什么是异步编程?

.Net异步编程异步编程场景以及最佳实践

.Net异步编程知多少

.Net异步编程学习列表

.NET 异步编程知多少

《javascript高级程序设计》学习笔记 | 11.1.异步编程