一种在 python 中用 asyncio 和协程实现的IO并发

Posted 资质平庸的程序员

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一种在 python 中用 asyncio 和协程实现的IO并发相关的知识,希望对你有一定的参考价值。

An IO-concurrency implement by async && coroutine in python

1 此篇文字围绕的主题

基于IO驱动程序1,本机从IO读取或发送数据——读/写IO的有效方法。

+--------------------------+     +----+
|                          | +==>|    |
|            +--+    +---+ | |   +----+ 1
| +-----+    |  |    | I | | |      .
| | CPU |<==>|MM|<==>|   |<==+      .
| +-----+    |  |    | O | | |      .
|            +--+    +---+ | |   +----+
|                          | +==>|    |
+--------------------------+     +----+ n
local-computer               remote-computers

MM: memory。
<==>: 数据流向。

2 IO多路复用 ≈ IO并发

基于IO驱动程序对IO的读/写,可看作是对IO缓冲区的读/写。

+--------------------------------------+       +----+
|                                      |  +===>|    |
|                        +--+    +---+ |  |1+----+ 1
|                    +==>|b1|<==>|   | |  |
| +-----+ concurrency|   |--|    | I | |  |     .
| | CPU |<===========+   |MM|    |   |<===+     .
| +-----+            |   |--|    | O | |  |     .
|                    +==>|bn|<==>|   | |  |   
|                        +--+    +---+ |  |    +----+
|                                      |  +===>|    |
+--------------------------------------+   n路 +----+ n
local-computer                            remote-computers

b: IO buffer

本机IO能实时接收远程计算机传输来的数据,(对于诸如网卡类IO,)当数据目的地为本机时 其会被上传到IO缓冲区。

IO多路复用指本机与远程计算机建立多路数据传输的软连接(每一路数据对应一个IO缓冲区),CPU通过调度CPU并发单位2 并发读/写IO缓冲区。

由CPU调度CPU并发单位读/写IO缓冲区实现了IO硬件并发传输各路数据。相比于CPU层面的并发,IO并发粒度更大(一定程度上削弱了IO的并发现象)。此篇文字将IO多路复用称为大粒度并发——这是本小节标题“IO多路复用 ≈ IO并发”的由来。

另外,此篇文字认为构成IO并发粒度比CPU并发粒度大的原因主要有三个。
[1] 数据粒度——让IO一次性(不切换)发送某路数据的量较大(如一整个缓冲区);
[2] IO速度比CPU低几个数量级;
[3] 较本机数据的传输时间,来自远程计算机数据的传输时间较长(相比前两个原因,这个是主要原因,可参考后续程序例子)。
利用IO并发时主要是为了提升IO利用率——当某路数据中断时去处理另一路数据。

3 IO同步并发 && IO异步并发

IO同步指一个CPU并发单位会阻塞等待IO缓冲区可用并完成对IO的读/写。

IO异步指若IO缓冲区可用则完成对IO的读/写;若IO缓冲区不可用则跳过IO读/写 继续调度并发单位后续指令运行,待IO缓冲区可用时再完成对其的读/写操作。

3.1 IO同步并发

通过多个CPU并发单位读/写某IO下的多路IO缓冲区可实现IO同步并发。

当无不可用IO缓冲区时,各IO缓冲区由CPU调度CPU并发单位 并发读/写;当 当前CPU并发单位读/写的IO缓冲区不可用时 则调度另一CPU并发单位尝试读/写相应的IO缓冲区。

如由n个线程分别读/写m(m >= n)路IO数据的IO缓冲区。

3.2 IO异步并发

由于CPU与IO的速度差异3,除能通过CPU多并发单位实现IO异步并发外,单CPU并发单位也可以实现异步IO在IO层面的并发

最直接的方式是让CPU轮询读/写IO缓冲区:

for (io = data_conn->start; ; io = data_conn->next)
    read or write current io buffer
    create one data connection when data connections are less than max

考虑本机IO利用率满(远程计算机将本机IO填满且都上传到IO缓冲区)的场景,当本机可调度并发量未饱和情况下,轮询方式可以满足异步IO并发的读/写。

4 IO并发——IO异步与协程的搭配

若单CPU并发已能满足当前场景下的IO并发,那么实现目标最友好方式就是“单CPU并发+IO异步”的方式啦4

对于比“CPU轮询+IO异步”稍复杂的IO并发业务,可尝试在单CPU并发单位内使用协程。协程可在应用程序中创建和调度。除具调度式并发特点外,其还有两个优点:
[1] 对CPU层面并发单位的友好性——其创建和调度开销不在内核上;
[2] 上下文切换开销比线程更小,协程执行效率可接近于3.2中的轮询方式。

额,到此为止,这篇文字个人理解文字的戏也太多了点吧…在接下来的篇章里不如直接写个例子算了。

5 写个小例子吧

用什么写个什么呢?

用python写个请求OneNET视频平台部分HTTP-APIs的例子吧。嗯,这种网络IO场景适合用“IO异步+协程”来实现。

5.1 初定功能

[1] 比较OneNET-Video-Platform-HTTP-APIs 同步请求和异步请求的时间差异;
[2] 在异步情况下保证某些HTTP-APIs请求的先后顺序;
[3] 提供参数配置入口,如命令行参数,配置文件。

5.2 asyncio && aiohttp

此文使用python的asyncio和aiohttp来完成HTTP-API的请求。这两个模块对IO异步的设置、协程实现和调度进行了封装,下一篇文字再探讨协程相关内容吧。

#!/usr/bin/python3
# -*- coding: utf-8 -*-

"""
asyctn.py, namely async coroutine, 
wrapper-functions for aiohttp contained.

lxr, 2019.12.15
"""

import json
import asyncio
try:
    from aiohttp import ClientSession
except:
    exit('you should run following command in your own environment ' + 
         'to install the third party libraries\\npip3(or pip) install -r requirements.txt')

_index = 0
    
async def _w_method(asyctn_method, url, 
            hdrs=None, params=None, data=None, timeout=None):
    ''' wrapper for the asyctn_method,e.g. aiohttp.ClientSession.get '''
    async with asyctn_method(url     = url,
                             headers = hdrs,
                             params  = params,
                             data    = data,
                             timeout = timeout) as resp:
        try: return await resp.text(encoding='utf-8')
        except Exception as e: print(e)
    
    return None

def _method_mapping(session, method):
    ''' mapping string 'get', 'post', 'delete', 'put' to session.get, 
        session.post, session.delete, session.put respectively.
    '''
    if   method == 'get':
        return session.get
    elif method == 'post':
        return session.post
    elif method == 'delete':
        return session.delete
    elif method == 'put':
        return session.put
    else:
        return None
        
async def w_request(url, method,
            hdrs=None, params=None, data=None, timeout=None):
    ''' wrapper for aiohttp.ClientSession.method(),
        make it be coroutine && async
    '''
    global _index
    
    async with ClientSession() as session:
        method = _method_mapping(session, method)
        _index += 1
        index = _index
        try: resp = await _w_method(method, url, 
                                hdrs=hdrs, params=params,
                                data=data, timeout=timeout)
        except Exception as e: print(e);return None
        return resp

class AsyCtnLoop():
    ''' wrapper for asyncio '''
    def __init__(self):
        try:
            self.loop = asyncio.get_event_loop()
        except Exception as e:
            exit(e)

    def w_loop(self, requests):
        ''' to run ont requests asynchronously by asyncio '''
        try:
            self.loop.run_until_complete(asyncio.wait(requests))
        except Exception as e: 
            print(e)
        else:
            print('')
        
    def w_close(self):
        self.loop.close()

if __name__ == '__main__':
    ''' for testing asyctn.py '''
    async def w_req_test_f(index):
        resp = await w_request('https://mijisou.com/', 'get')
        print(f'index resp[0:3] if resp != None else None')
    async def w_req_test_s(index):
        resp = await w_request('https://mijisou.com/', 'get')
        print(f'index resp[0:3] if resp != None else None')
        
    requests_f = []
    requests_s = []
    for i in range(5):
        requests_f.append(w_req_test_f(i + 1))
    for i in range(5, 10):
        requests_s.append(w_req_test_s(i + 1))
    
    asyctn_loop = AsyCtnLoop()
    asyctn_loop.w_loop(requests_f)
    asyctn_loop.w_loop(requests_s)
    asyctn_loop.w_close(

由于asyctn.py包含第三方模块aiohttp,为了给使用者在缺乏aiohttp库时一个友好提示,此文用pipreqs为本python工程生成了requirements.txt。

先看看asyctn.py的运行体验吧。

> python asyctn.py
2 <!DOCTYPE
1 <!DOCTYPE
4 <!DOCTYPE
3 <!DOCTYPE
5 <!DOCTYPE

10 <!DOCTYPE
8 <!DOCTYPE
7 <!DOCTYPE
9 <!DOCTYPE
6 <!DOCTYPE

根据运行体验结果,可以明确两个结果:
[1] 组内API以异步方式执行;
[2] 组间API顺序执行。

5.3 提供配置入口

配置入口以命令行参数形式提供还是以配置文件提供呢?此文特别不擅长了解到他人喜好,为了具有尽量友好的理念,只有两种方式都提供了。

此文选择argparse作为命令行参数解析是因为其是python的内置模块,不涉及第三方模块的管理。

#!/usr/bin/python3
# -*- coding: utf-8 -*-

"""
ontApi_argparser.py,
to parse configurations from command or config file.

lxr, 2019.12.15
"""

import argparse
import json
import sys
import os

class OntApiCmdArgs():
    ''' to parse the command arguments for 
        OneNET Video-Platform by argparse module.
        
        instance variables PARSER, ARGS, HAS_CFG, HAS_ARGS,
        can be used directlly by outside.
    '''
    
    def __init__(self):
        self.HAS_CFG  = False
        self.HAS_ARGS = False
        self.PARSER   = self.ARGS = None
        
        parser = self.__create_cmdarg_parser()
        self.__add_optargs(parser)
        
        args = self.__parse_cmdargs(parser)
        self.__default_configurations(args)
        self.__update_cfg_depend_on_user(args)
        self.__tips_for_logs_output_file(args)
        
        self.ARGS   = args
        self.PARSER = parser
        
    def __create_cmdarg_parser(self):
        ARGS = argparse.ArgumentParser(
                description = self._descriprion + '-' * len(self._descriprion),
                epilog      = self._epilog      + '-' * len(self._epilog),
                formatter_class = argparse.RawTextHelpFormatter)
        return ARGS
    
    def __add_optargs(self, parser):
        self.__add_version_optarg(parser)
        self.__add_platform_optargs(parser)
        self.__add_device_optargs(parser)
        self.__add_log_optargs(parser)
        self.__add_cfg_optargs(parser)
        self.__add_default_optargs(parser)
        
    def __add_version_optarg(self, parser):
        parser.add_argument('-v', '--version',
                action  = 'version',
                version = f'sys.argv[0] 0.0.1' )
    
    def __add_platform_optargs(self, parser):
        group = parser.add_argument_group(self._optarg_desc['platform'])
        group.add_argument('-s',
                metavar = '  IP',
                dest    = 'ip', 
                type    = str,
                default = self._default_cfg['ip'],
                help    = self._help['ip'] )
        group.add_argument('-p',
                metavar = '  PORT',
                dest    = 'port',
                type    = int,
                default = self._default_cfg['port'],
                help    = self._help['port'] )
        group.add_argument('-hdr',
                metavar = 'HEADER',
                dest    = 'header',
                type    = str,
                default = self._default_cfg['header'],
                help    = self._help['header'])
    
    def __add_device_optargs(self, parser):
        group = parser.add_argument_group(self._optarg_desc['device'])
        group.add_argument('-did',
                metavar = 'DEVICE_ID',
                dest    = 'did',
                type    = int,
                default = self._default_cfg['did'],
                help    = self._help['did'] )
        group.add_argument('-pid',
                metavar = 'PRODUCT_ID',
                dest    = 'pid',
                type    = int,
                default = self._default_cfg['pid'],
                help    = self._help['pid'] )
    
    def __add_log_optargs(self, parser):
        def str2bool(_str):
            return True if _str.lower() not in ['false', '0'] else False
        group = parser.add_argument_group(self._optarg_desc['log'])
        group.add_argument('-req', 
                metavar = '0/1',
                dest    = 'req',
                type    = str2bool,
                default = self._default_cfg['req'],
                help    = self._help['req'] )
        group.add_argument('-rep', 
                metavar = '0/1',
                dest    = 'rep',
                type    = str2bool,
                default = self._default_cfg['rep'],
                help    = self._help['rep'] )
        group.add_argument('-jsi', 
                metavar = 'NR',
                dest    = 'jsi',
                type    = int,
                default = self._default_cfg['jsi'],
                choices = range(1, 8),
                help    = self._help['jsi'] )
        group.add_argument('--log',
                metavar = 'LOGFILE',
                type    = argparse.FileType('w', encoding='UTF-8'),
                default = self._default_cfg['log'],
                help    = self._help['log'] )
    
    def __add_cfg_optargs(self, parser):
        group = parser.add_argument_group(self._optarg_desc['cfg'])
        group.add_argument('--cfg',
                metavar = 'CONFIGFILE',
                type    = str,
                default = self._default_cfg['cfg'],
                help    = self._help['cfg'])
    
    def __add_default_optargs(self, parser):
        group = parser.add_argument_group(self._optarg_desc['dft'])
        group.add_argument('-dft', '--default',
                action = 'store_true',
                help   = self._help['dft'])
        
    def __parse_cmdargs(self, parser):
        try: return parser.parse_args()
        except IOError as e: exit(e)
    
    def __tips_for_logs_output_file(self, args):
        if args.__dict__['log'].name != self._default_cfg['log']以上是关于一种在 python 中用 asyncio 和协程实现的IO并发的主要内容,如果未能解决你的问题,请参考以下文章

一种在C语言中用 System V ucontext 实现的协程切换

一文搞明白Python协程编程:asyncio库

一文搞明白Python协程编程:asyncio库

自己手写调度器,理解Python中的asyncio异步事件循环与协程

自己手写调度器,理解Python中的asyncio异步事件循环与协程

自己手写调度器,理解Python中的asyncio异步事件循环与协程