异步IO
Posted huangqiang97
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步IO相关的知识,希望对你有一定的参考价值。
参考: 同步/异步 阻塞/非阻塞 iterator / generator asyio/cortine asy_io
# -------------------------------->同步/异步 阻塞/非阻塞------------------------------------
#
# 在一个线程中,CPU执行代码的速度极快,然而,一旦遇到IO操作,如读写文件、发送网络数据时,就需要等待IO操作完成,才能继续进行下一步操作。这种情况称为同步IO。
#
# 使用多线程或者多进程来并发执行代码,为多个用户服务。每个用户都会分配一个线程,如果遇到IO导致线程被挂起,其他用户的线程不受影响,但是系统不能无上限地增加线程。由于系统切换线程的开销也很大,一旦线程数量过多,CPU的时间就花在线程切换上了,结果导致性能严重下降。
#
# loop = get_event_loop()
# while True:
# event = loop.get_event()
# process_event(event)
# 当代码需要执行一个耗时的IO操作时,它只发出IO指令,并不等待IO结果,然后就去执行其他代码了。一段时间后,当IO返回结果时,再通知CPU进行处理。
#
# 当遇到IO操作时,代码只负责发出IO请求,不等待IO结果,然后直接结束本轮消息处理,进入下一轮消息处理过程。当IO操作完成后,将收到一条“IO完成”的消息,处理该消息时就可以直接获取IO操作结果。
#
# 在“发出IO请求”到收到“IO完成”的这段时间里,同步IO模型下,主线程只能挂起,但异步IO模型下,主线程并没有休息,而是在消息循环中继续处理其他消息。这样,在异步IO模型下,一个线程就可以同时处理多个IO请求,并且没有切换线程的操作。
#
# 烧水:
# 1 老张把水壶放到火上,立等水开。(同步阻塞)
# 2 老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞)
# 3 老张把水开了会响的水壶放到火上,立等水开。(异步阻塞)
# 4 老张把水开了会响的水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞)
#
# 所谓同步异步,只是对于水壶而言:普通水壶,同步,能没有结束前,一直等结果;响水壶,异步,不需要知道该功能结果,该功能有结果后通知(回调通知)。 响水壶可以在自己完工之后,提示老张水开了。 普通水壶同步只能让调用者去轮询自己。
# 所谓阻塞非阻塞,仅仅对于老张而言。 立等的老张,阻塞,(函数)没有接收完数据或者没有得到结果之前,不会返回;看电视的老张,非阻塞,(函数)立即返回,通过select通知调用者。
# -------------------------------->COROUTINE-------------------------------------------
# 子程序,或者称为函数,在所有语言中都是层级调用,比如A调用B,B在执行过程中又调用了C,C执行完毕返回,B执行完毕返回,最后是A执行完毕。
# 所以子程序调用是通过栈实现的,一个线程就是执行一个子程序。
# 子程序调用总是一个入口,一次返回,调用顺序是明确的。而协程的调用和子程序不同。
# 协程看上去也是子程序,但执行过程中,在子程序内部可中断,然后转而执行别的子程序,在适当的时候再返回来接着执行。有点类似CPU的中断.
# 执行有点像多线程,但协程的特点在于是一个线程执行
# 协程极高的执行效率。子程序切换不是线程切换,而是由程序自身控制,没有线程切换的开销。
# 不需要多线程的锁机制,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态。
# 协程是一个线程执行,利用多核CPU:多进程+协程,既充分利用多核,又充分发挥协程的高效率。
# yield 与yield
# from:
# yield list:返回整个列表,yield from list:一次返回一个元素。
# yield from:会自动处理大量错误。
# yield from后面必须是子生成器函数
#
# # 子生成器
# def ger_0():
# loop:
# x = yield
# do
# somthing
# return result
#
# # 委托生成器
# def ger_1():
# result = yield from ger_0()
#
# # 调用方
# g = ger_1()
# g.send(0) -->ger_0: x = 0
# g.send(1) -->ger_0: x = 1
# g.send(2) -->ger_0: x = 2
# ger_1: result = ger_0:result
# consumer函数是一个generator
def consumer():
print('2---------')
r = ''
while True:
print('3---------')
# 通过yield拿到消息n处理,又通过yield把结果r传回
# 赋值语句先计算= 右边,由于右边是 yield 语句,
# 所以yield语句执行完以后,进入暂停,而赋值语句在下一次启动生成器的时候首先被执行;
# P:sned(None)->C:yield r=''->P:send(1)->C:n=1->C:yield r='200 OK'->P:send(2)
n = yield r
if not n:
print('6---------')
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
print('1---------')
# 启动生成器
# 在一个生成器函数未启动之前,是不能传递值进去。
# 也就是说在使用c.send(n)之前,必须先使用c.send(None)或者next(c)来返回生成器的第一个值
c.send(None)
n = 0
while n < 5:
n = n + 1
print('4---------')
print('[PRODUCER] Producing %s...' % n)
# 切换到consumer执行
r = c.send(n)
print('5---------')
# 拿到consumer处理的结果,继续生产下一条消息
print('[PRODUCER] Consumer return: %s' % r)
# 关闭consumer,整个过程结束。
c.close()
# 1->2->3 -> 4->3->5 -> 4->3->5 -> 4->3->5
c = consumer()
produce(c)
# -------------------------------->ASYNCIO-------------------------------------------
# https://blog.csdn.net/SL_World/article/details/86597738
# asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。
# 异步操作需要在coroutine中通过yield from完成;
# 多个coroutine可以封装成一组Task然后并发执行。
import threading
import asyncio
# 把一个generator标记为coroutine类型
@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
# yield from语法可以让我们方便地调用另一个generator
# asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。
# 当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。
yield from asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
# 两个coroutine是由同一个线程并发执行的。
# 直到循环事件的所有事件都处理完才能完整结束。
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
# 引入了新的语法async和await,可以让coroutine的代码更简洁易读
# @asyncio.coroutine替换为async;
# 把yield from替换为await。
import threading
import asyncio
async def hello():
print('Hello world! (%s)' % threading.currentThread())
await asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
# -------------------------------->调用方-子生成器-委托生成器<-------------------------------
import time
import asyncio
async def taskIO_1():
print('开始运行IO任务1...')
await asyncio.sleep(3)
print('IO任务1已完成,耗时3s')
return taskIO_1.__name__
async def taskIO_2():
print('开始运行IO任务2...')
await asyncio.sleep(2)
print('IO任务2已完成,耗时2s')
return taskIO_2.__name__
# 调用方
async def main():
# 把所有任务添加到task中
tasks = [taskIO_1(), taskIO_2()]
# 返回已经完成的任务,完成一个返回一个
for completed_task in asyncio.as_completed(tasks):
# 子生成器
resualt = await completed_task
print('协程无序返回值:'+resualt)
# # done:已经完成的任务,pending:未完成任务
# # 等待任务全部完成才返回
# done, pending = await asyncio.wait(tasks)
# for r in done:
# print('协程无序返回值:' + r.result())
if __name__ == '__main__':
start = time.time()
# 创建一个事件循环对象loop
loop = asyncio.get_event_loop()
try:
# 完成事件循环,直到最后一个任务结束
loop.run_until_complete(main())
finally:
# 结束事件循环
loop.close()
print('所有IO任务总耗时%.5f秒' % float(time.time()-start))
# -------------------------------->AIOHTTP-------------------------------------------
# 把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。
# asyncio实现了TCP、UDP、SSL等协议
from aiohttp import web
routes = web.RouteTableDef()
@routes.get('/')
async def index(request):
await asyncio.sleep(2)
return web.json_response({
'name': 'index'
})
@routes.get('/about')
async def about(request):
await asyncio.sleep(0.5)
return web.Response(text="<h1>about us</h1>")
def init():
app = web.Application()
app.add_routes(routes)
web.run_app(app)
init()
以上是关于异步IO的主要内容,如果未能解决你的问题,请参考以下文章