一种在 python 中用 asyncio 和协程实现的IO并发
Posted 资质平庸的程序员
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一种在 python 中用 asyncio 和协程实现的IO并发相关的知识,希望对你有一定的参考价值。
An IO-concurrency implement by async && coroutine in python
1 此篇文字围绕的主题
基于IO驱动程序1,本机从IO读取或发送数据——读/写IO的有效方法。
+--------------------------+ +----+
| | +==>| |
| +--+ +---+ | | +----+ 1
| +-----+ | | | I | | | .
| | CPU |<==>|MM|<==>| |<==+ .
| +-----+ | | | O | | | .
| +--+ +---+ | | +----+
| | +==>| |
+--------------------------+ +----+ n
local-computer remote-computers
MM: memory。
<==>: 数据流向。
2 IO多路复用 ≈ IO并发
基于IO驱动程序对IO的读/写,可看作是对IO缓冲区的读/写。
+--------------------------------------+ +----+
| | +===>| |
| +--+ +---+ | |1路 +----+ 1
| +==>|b1|<==>| | | |
| +-----+ concurrency| |--| | I | | | .
| | CPU |<===========+ |MM| | |<===+ .
| +-----+ | |--| | O | | | .
| +==>|bn|<==>| | | |
| +--+ +---+ | | +----+
| | +===>| |
+--------------------------------------+ n路 +----+ n
local-computer remote-computers
b: IO buffer。
本机IO能实时接收远程计算机传输来的数据,(对于诸如网卡类IO,)当数据目的地为本机时 其会被上传到IO缓冲区。
IO多路复用指本机与远程计算机建立多路数据传输的软连接(每一路数据对应一个IO缓冲区),CPU通过调度CPU并发单位2 并发读/写IO缓冲区。
由CPU调度CPU并发单位读/写IO缓冲区实现了IO硬件并发传输各路数据。相比于CPU层面的并发,IO并发粒度更大(一定程度上削弱了IO的并发现象)。此篇文字将IO多路复用称为大粒度并发——这是本小节标题“IO多路复用 ≈ IO并发”的由来。
另外,此篇文字认为构成IO并发粒度比CPU并发粒度大的原因主要有三个。
[1] 数据粒度——让IO一次性(不切换)发送某路数据的量较大(如一整个缓冲区);
[2] IO速度比CPU低几个数量级;
[3] 较本机数据的传输时间,来自远程计算机数据的传输时间较长(相比前两个原因,这个是主要原因,可参考后续程序例子)。
利用IO并发时主要是为了提升IO利用率——当某路数据中断时去处理另一路数据。
3 IO同步并发 && IO异步并发
IO同步指一个CPU并发单位会阻塞等待IO缓冲区可用并完成对IO的读/写。
IO异步指若IO缓冲区可用则完成对IO的读/写;若IO缓冲区不可用则跳过IO读/写 继续调度并发单位后续指令运行,待IO缓冲区可用时再完成对其的读/写操作。
3.1 IO同步并发
通过多个CPU并发单位读/写某IO下的多路IO缓冲区可实现IO同步并发。
当无不可用IO缓冲区时,各IO缓冲区由CPU调度CPU并发单位 并发读/写;当 当前CPU并发单位读/写的IO缓冲区不可用时 则调度另一CPU并发单位尝试读/写相应的IO缓冲区。
如由n个线程分别读/写m(m >= n)路IO数据的IO缓冲区。
3.2 IO异步并发
由于CPU与IO的速度差异3,除能通过CPU多并发单位实现IO异步并发外,单CPU并发单位也可以实现异步IO在IO层面的并发。
最直接的方式是让CPU轮询读/写IO缓冲区:
for (io = data_conn->start; ; io = data_conn->next)
read or write current io buffer
create one data connection when data connections are less than max
考虑本机IO利用率满(远程计算机将本机IO填满且都上传到IO缓冲区)的场景,当本机可调度并发量未饱和情况下,轮询方式可以满足异步IO并发的读/写。
4 IO并发——IO异步与协程的搭配
若单CPU并发已能满足当前场景下的IO并发,那么实现目标最友好方式就是“单CPU并发+IO异步”的方式啦4。
对于比“CPU轮询+IO异步”稍复杂的IO并发业务,可尝试在单CPU并发单位内使用协程。协程可在应用程序中创建和调度。除具调度式并发特点外,其还有两个优点:
[1] 对CPU层面并发单位的友好性——其创建和调度开销不在内核上;
[2] 上下文切换开销比线程更小,协程执行效率可接近于3.2中的轮询方式。
额,到此为止,这篇文字个人理解文字的戏也太多了点吧…在接下来的篇章里不如直接写个例子算了。
5 写个小例子吧
用什么写个什么呢?
用python写个请求OneNET视频平台部分HTTP-APIs的例子吧。嗯,这种网络IO场景适合用“IO异步+协程”来实现。
5.1 初定功能
[1] 比较OneNET-Video-Platform-HTTP-APIs 同步请求和异步请求的时间差异;
[2] 在异步情况下保证某些HTTP-APIs请求的先后顺序;
[3] 提供参数配置入口,如命令行参数,配置文件。
5.2 asyncio && aiohttp
此文使用python的asyncio和aiohttp来完成HTTP-API的请求。这两个模块对IO异步的设置、协程实现和调度进行了封装,下一篇文字再探讨协程相关内容吧。
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
asyctn.py, namely async coroutine,
wrapper-functions for aiohttp contained.
lxr, 2019.12.15
"""
import json
import asyncio
try:
from aiohttp import ClientSession
except:
exit('you should run following command in your own environment ' +
'to install the third party libraries\\npip3(or pip) install -r requirements.txt')
_index = 0
async def _w_method(asyctn_method, url,
hdrs=None, params=None, data=None, timeout=None):
''' wrapper for the asyctn_method,e.g. aiohttp.ClientSession.get '''
async with asyctn_method(url = url,
headers = hdrs,
params = params,
data = data,
timeout = timeout) as resp:
try: return await resp.text(encoding='utf-8')
except Exception as e: print(e)
return None
def _method_mapping(session, method):
''' mapping string 'get', 'post', 'delete', 'put' to session.get,
session.post, session.delete, session.put respectively.
'''
if method == 'get':
return session.get
elif method == 'post':
return session.post
elif method == 'delete':
return session.delete
elif method == 'put':
return session.put
else:
return None
async def w_request(url, method,
hdrs=None, params=None, data=None, timeout=None):
''' wrapper for aiohttp.ClientSession.method(),
make it be coroutine && async
'''
global _index
async with ClientSession() as session:
method = _method_mapping(session, method)
_index += 1
index = _index
try: resp = await _w_method(method, url,
hdrs=hdrs, params=params,
data=data, timeout=timeout)
except Exception as e: print(e);return None
return resp
class AsyCtnLoop():
''' wrapper for asyncio '''
def __init__(self):
try:
self.loop = asyncio.get_event_loop()
except Exception as e:
exit(e)
def w_loop(self, requests):
''' to run ont requests asynchronously by asyncio '''
try:
self.loop.run_until_complete(asyncio.wait(requests))
except Exception as e:
print(e)
else:
print('')
def w_close(self):
self.loop.close()
if __name__ == '__main__':
''' for testing asyctn.py '''
async def w_req_test_f(index):
resp = await w_request('https://mijisou.com/', 'get')
print(f'index resp[0:3] if resp != None else None')
async def w_req_test_s(index):
resp = await w_request('https://mijisou.com/', 'get')
print(f'index resp[0:3] if resp != None else None')
requests_f = []
requests_s = []
for i in range(5):
requests_f.append(w_req_test_f(i + 1))
for i in range(5, 10):
requests_s.append(w_req_test_s(i + 1))
asyctn_loop = AsyCtnLoop()
asyctn_loop.w_loop(requests_f)
asyctn_loop.w_loop(requests_s)
asyctn_loop.w_close(
由于asyctn.py包含第三方模块aiohttp,为了给使用者在缺乏aiohttp库时一个友好提示,此文用pipreqs为本python工程生成了requirements.txt。
先看看asyctn.py的运行体验吧。
> python asyctn.py
2 <!DOCTYPE
1 <!DOCTYPE
4 <!DOCTYPE
3 <!DOCTYPE
5 <!DOCTYPE
10 <!DOCTYPE
8 <!DOCTYPE
7 <!DOCTYPE
9 <!DOCTYPE
6 <!DOCTYPE
根据运行体验结果,可以明确两个结果:
[1] 组内API以异步方式执行;
[2] 组间API顺序执行。
5.3 提供配置入口
配置入口以命令行参数形式提供还是以配置文件提供呢?此文特别不擅长了解到他人喜好,为了具有尽量友好的理念,只有两种方式都提供了。
此文选择argparse作为命令行参数解析是因为其是python的内置模块,不涉及第三方模块的管理。
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
ontApi_argparser.py,
to parse configurations from command or config file.
lxr, 2019.12.15
"""
import argparse
import json
import sys
import os
class OntApiCmdArgs():
''' to parse the command arguments for
OneNET Video-Platform by argparse module.
instance variables PARSER, ARGS, HAS_CFG, HAS_ARGS,
can be used directlly by outside.
'''
def __init__(self):
self.HAS_CFG = False
self.HAS_ARGS = False
self.PARSER = self.ARGS = None
parser = self.__create_cmdarg_parser()
self.__add_optargs(parser)
args = self.__parse_cmdargs(parser)
self.__default_configurations(args)
self.__update_cfg_depend_on_user(args)
self.__tips_for_logs_output_file(args)
self.ARGS = args
self.PARSER = parser
def __create_cmdarg_parser(self):
ARGS = argparse.ArgumentParser(
description = self._descriprion + '-' * len(self._descriprion),
epilog = self._epilog + '-' * len(self._epilog),
formatter_class = argparse.RawTextHelpFormatter)
return ARGS
def __add_optargs(self, parser):
self.__add_version_optarg(parser)
self.__add_platform_optargs(parser)
self.__add_device_optargs(parser)
self.__add_log_optargs(parser)
self.__add_cfg_optargs(parser)
self.__add_default_optargs(parser)
def __add_version_optarg(self, parser):
parser.add_argument('-v', '--version',
action = 'version',
version = f'sys.argv[0] 0.0.1' )
def __add_platform_optargs(self, parser):
group = parser.add_argument_group(self._optarg_desc['platform'])
group.add_argument('-s',
metavar = ' IP',
dest = 'ip',
type = str,
default = self._default_cfg['ip'],
help = self._help['ip'] )
group.add_argument('-p',
metavar = ' PORT',
dest = 'port',
type = int,
default = self._default_cfg['port'],
help = self._help['port'] )
group.add_argument('-hdr',
metavar = 'HEADER',
dest = 'header',
type = str,
default = self._default_cfg['header'],
help = self._help['header'])
def __add_device_optargs(self, parser):
group = parser.add_argument_group(self._optarg_desc['device'])
group.add_argument('-did',
metavar = 'DEVICE_ID',
dest = 'did',
type = int,
default = self._default_cfg['did'],
help = self._help['did'] )
group.add_argument('-pid',
metavar = 'PRODUCT_ID',
dest = 'pid',
type = int,
default = self._default_cfg['pid'],
help = self._help['pid'] )
def __add_log_optargs(self, parser):
def str2bool(_str):
return True if _str.lower() not in ['false', '0'] else False
group = parser.add_argument_group(self._optarg_desc['log'])
group.add_argument('-req',
metavar = '0/1',
dest = 'req',
type = str2bool,
default = self._default_cfg['req'],
help = self._help['req'] )
group.add_argument('-rep',
metavar = '0/1',
dest = 'rep',
type = str2bool,
default = self._default_cfg['rep'],
help = self._help['rep'] )
group.add_argument('-jsi',
metavar = 'NR',
dest = 'jsi',
type = int,
default = self._default_cfg['jsi'],
choices = range(1, 8),
help = self._help['jsi'] )
group.add_argument('--log',
metavar = 'LOGFILE',
type = argparse.FileType('w', encoding='UTF-8'),
default = self._default_cfg['log'],
help = self._help['log'] )
def __add_cfg_optargs(self, parser):
group = parser.add_argument_group(self._optarg_desc['cfg'])
group.add_argument('--cfg',
metavar = 'CONFIGFILE',
type = str,
default = self._default_cfg['cfg'],
help = self._help['cfg'])
def __add_default_optargs(self, parser):
group = parser.add_argument_group(self._optarg_desc['dft'])
group.add_argument('-dft', '--default',
action = 'store_true',
help = self._help['dft'])
def __parse_cmdargs(self, parser):
try: return parser.parse_args()
except IOError as e: exit(e)
def __tips_for_logs_output_file(self, args):
if args.__dict__['log'].name != self._default_cfg['log']以上是关于一种在 python 中用 asyncio 和协程实现的IO并发的主要内容,如果未能解决你的问题,请参考以下文章
一种在C语言中用 System V ucontext 实现的协程切换
自己手写调度器,理解Python中的asyncio异步事件循环与协程