Python之asyncio模块的使用

Posted 小粉优化大师

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python之asyncio模块的使用相关的知识,希望对你有一定的参考价值。

asyncio模块作用:构建协程并发应用的工具

python并发的三大内置模块,简单认识:

1、multiprocessing:多进程并发处理
2、threading模块:多线程并发处理
3、asyncio模块:协程并发处理

 1、启动一个协程,任务无返回值,需要注意:async的使用

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

# 开头定义async,表示要在协程运行,不定义的话,循环监听增加不了
async def coroutine():
    print(协程运行...)

# 定义一个事件循环监听
event_loop = asyncio.get_event_loop()

try:
    print(协程开始...)
    coroutine_obj = coroutine()
    print(进入事件循环监听...)
    event_loop.run_until_complete(coroutine())  # run_until_complete翻译成中文:一直运行到完成为止
finally:
    print(关闭事件循环监听..)
    event_loop.close()
asyncio_coroutine.py

运行效果

[root@ mnt]# python3 asyncio_coroutine.py 
协程开始...
进入事件循环监听...
协程运行...
关闭事件循环监听..
sys:1: RuntimeWarning: coroutine coroutine was never awaited

 2、启动一个协程,任务有返回值,需要注意:async的使用

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

# 开头定义async,表示要在协程运行,不定义的话,循环监听增加不了
async def coroutine():
    print(协程运行...)
    return ok

# 定义一个事件循环监听
event_loop = asyncio.get_event_loop()

try:
    coroutine_obj = coroutine()
    return_value = event_loop.run_until_complete(coroutine())  # run_until_complete翻译成中文:一直运行到完成为止
    print(coroutine()返回值:, return_value)
finally:
    event_loop.close()
asyncio_coroutine_return.py

运行效果

[root@ mnt]# python3 asyncio_coroutine_return.py 
协程运行...
coroutine()返回值: ok
sys:1: RuntimeWarning: coroutine coroutine was never awaited

 3、启动一个协程,任务调用其它任务运行,需要注意:await 的使用

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

# 开头定义async,表示要在协程运行,不定义的话,循环监听增加不了
async def coroutine():
    print(coroutine内部运行)

    print(等待task_1运行结果)
    task1 = await task_1() #await作用:控制运行流程,按顺序执行,即等待该函数运行完成,再继续往后执行

    print(等待task_2运行结果)
    task2 = await task_2(task1)

    return (task1, task2)

async def task_1():
    print(task_1内部运行)
    return task_1 ok


async def task_2(arg):
    print(task_2内部运行)
    return task_2 arg:{}.format(arg)

# 定义一个事件循环监听
event_loop = asyncio.get_event_loop()

try:
    coroutine_obj = coroutine()
    return_value = event_loop.run_until_complete(coroutine())  # run_until_complete翻译成中文:一直运行到完成为止
    print(coroutine()返回值:, return_value)
finally:
    event_loop.close()
asyncio_coroutine_chain.py

运行效果

[root@ mnt]# python3 asyncio_coroutine_chain.py 
coroutine内部运行
等待task_1运行结果
task_1内部运行
等待task_2运行结果
task_2内部运行
coroutine()返回值: (task_1 ok, task_2 arg:task_1 ok)
sys:1: RuntimeWarning: coroutine coroutine was never awaited

 4、生成器而不是协程

Python3早期版本的语法如下

@asyncio.coroutine 替换为 async
yield from 替换为 await
技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

# 开头定义async,表示要在协程运行,不定义的话,循环监听增加不了
@asyncio.coroutine
def coroutine():
    print(coroutine内部运行)

    print(等待task_1运行结果)
    task1 = yield from task_1() #await作用:控制运行流程,按顺序执行,即等待该函数运行完成,再继续往后执行

    print(等待task_2运行结果)
    task2 = yield from task_2(task1)

    return (task1, task2)

@asyncio.coroutine
async def task_1():
    print(task_1内部运行)
    return task_1 ok

@asyncio.coroutine
async def task_2(arg):
    print(task_2内部运行)
    return task_2 arg:{}.format(arg)

# 定义一个事件循环监听
event_loop = asyncio.get_event_loop()

try:
    coroutine_obj = coroutine()
    return_value = event_loop.run_until_complete(coroutine())  # run_until_complete翻译成中文:一直运行到完成为止
    print(coroutine()返回值:, return_value)
finally:
    event_loop.close()
asyncio_generator.py

运行效果

[root@ mnt]# python3 asyncio_generator.py 
coroutine内部运行
等待task_1运行结果
task_1内部运行
等待task_2运行结果
task_2内部运行
coroutine()返回值: (task_1 ok, task_2 arg:task_1 ok)

 5、协程回调函数调用,此示例:讯速回调

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import functools

def callback(arg, *, kwarg=default):
    print(回调函数arg={},kwarg={}.format(arg, kwarg))

async def main(loop):
    print(注册回调函数)
    loop.call_soon(callback, 1)  # 执行回调函数,传入参数1
    wrapped = functools.partial(callback, kwarg=not default)  # 利用偏函数,给kwarg传默认值
    loop.call_soon(wrapped, 2)  # 执行回调函数,传入参数2
    await asyncio.sleep(0.5)

event_loop = asyncio.get_event_loop()
try:
    print(进入事件循环监听)
    event_loop.run_until_complete(main(event_loop))  # 将事件循环对象传入main函数中
finally:
    print(关闭事件循环监听)
    event_loop.close()
asyncio_call_soon.py

运行效果

[root@ mnt]# python3 asyncio_call_soon.py 
进入事件循环监听
注册回调函数
回调函数arg=1,kwarg=default
回调函数arg=2,kwarg=not default
关闭事件循环监听

  6、协程回调函数调用,此示例:延时回调

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

def callback(arg):
    print(回调函数arg={}.format(arg))

async def main(loop):
    print(注册回调函数)
    loop.call_later(1,callback,延时1秒回调参数1)
    loop.call_later(1,callback,延时1秒回调参数2)
    loop.call_soon(callback,讯速的回调参数)
    await asyncio.sleep(3)

event_loop = asyncio.get_event_loop()
try:
    print(进入事件循环监听)
    event_loop.run_until_complete(main(event_loop))  # 将事件循环对象传入main函数中
finally:
    print(关闭事件循环监听)
    event_loop.close()
asyncio_call_delay.py

运行效果

[root@ mnt]# python3 asyncio_call_delay.py 
进入事件循环监听
注册回调函数
回调函数arg=讯速的回调参数
回调函数arg=延时1秒回调参数1
回调函数arg=延时1秒回调参数2
关闭事件循环监听

  7、协程回调函数调用,此示例:指定时间回调

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import time

def callback(arg, loop):
    print(回调函数arg={} 回调的时间time={}.format(arg, loop.time()))

async def main(loop):
    now = loop.time()
    print(时钟时间:{}.format(time.time()))
    print(时事件循环时间:{}.format(loop.time()))
    print(注册回调函数)
    loop.call_at(now + 1, callback, 参数1, loop)
    loop.call_at(now + 2, callback, 参数2, loop)
    loop.call_soon(callback, 讯速的回调参数, loop)
    await asyncio.sleep(4)

event_loop = asyncio.get_event_loop()
try:
    print(进入事件循环监听)
    event_loop.run_until_complete(main(event_loop))  # 将事件循环对象传入main函数中
finally:
    print(关闭事件循环监听)
    event_loop.close()
asyncio_call_at.py

运行结果

[root@ mnt]# python3 asyncio_call_at.py 
进入事件循环监听
时钟时间:1576030580.730174
时事件循环时间:233762.828430848
注册回调函数
回调函数arg=讯速的回调参数 回调的时间time=233762.828485111
回调函数arg=参数1 回调的时间time=233763.829784903
回调函数arg=参数2 回调的时间time=233764.831077136
关闭事件循环监听

 8、基于Future实现异步返回任务执行结果

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

def mark_done(future, result):
    """标记完成的函数"""
    print(设置 Future 返回结果 {}.format(result))
    future.set_result(result)

event_loop = asyncio.get_event_loop()
try:
    all_done = asyncio.Future()
    print(调度标记完成的函数)
    event_loop.call_soon(mark_done, all_done, 这个是调度传入的数据)
    result = event_loop.run_until_complete(all_done)
    print(运行返回的结果:{}.format(result))
finally:
    print(关闭事件循环监听)
    event_loop.close()

print(Future 返回的结果: {}.format(all_done.result()))
"""
结论:
    返回结果可以从两个地方获取:
    1、result = event_loop.run_until_complete(all_done)
    2、Future.result()
"""
asyncio_future_event_loop.py

运行效果

[root@ mnt]# python3 asyncio_future_event_loop.py 
调度标记完成的函数
设置 Future 返回结果 这个是调度传入的数据
运行返回的结果:这个是调度传入的数据
关闭事件循环监听
Future 返回的结果: 这个是调度传入的数据

 9、基于Future+await类现异步返回任务执行结果

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

def mark_done(future, result):
    """标记完成的函数"""
    print(设置 Future 返回结果 {}.format(result))
    future.set_result(result)

async def main(loop):
    all_done = asyncio.Future()
    print(调度标记完成的函数)
    loop.call_soon(mark_done, all_done, 这个是调度传入的数据)
    result = await all_done  # await作用:等all_done返回结果,再往下运行
    print(mark_done()执行完成,返回值 : {}.format(result))

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    print(关闭事件循环监听)
    event_loop.close()
asyncio_future_await.py

运行效果

[root@ mnt]# python3 asyncio_future_await.py 
调度标记完成的函数
设置 Future 返回结果 这个是调度传入的数据
mark_done()执行完成,返回值 : 这个是调度传入的数据
关闭事件循环监听

 10、基于Future的回调

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import functools

def callback(future, n):
    print({}: future 完成: {}.format(n, future.result()))


async def register_callbacks(future_obj):
    print(将回调函数注册到Future中)

    # 这里需要注意的是add_done_callback函数,还为把当前实例对象作为参数,传给函数,所以回调函数多一个callback(future, n)
    future_obj.add_done_callback(functools.partial(callback, n=1))
    future_obj.add_done_callback(functools.partial(callback, n=2))

async def main(future_obj):
    # 注册future的回调函数
    await register_callbacks(future_obj)
    print(设置Future返回结果)
    future_obj.set_result(the result)

event_loop = asyncio.get_event_loop()
try:
    # 创建一个future实例
    future_obj = asyncio.Future()

    # 增future实例传给main函数处理
    event_loop.run_until_complete(main(future_obj))
finally:
    event_loop.close()
asyncio_future_callback.py

运行效果

[root@ mnt]# python3 asyncio_future_callback.py 
将回调函数注册到Future中
设置Future返回结果
1: future 完成: the result
2: future 完成: the result

 11、asyncio创建任务运行

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def task_func():
    print(task_func 执行完成)
    return task_func返回值ok

async def main(loop):
    print(创建任务)
    task = loop.create_task(task_func())
    print(等待task的结果 {}.format(task))
    return_value = await task #直到遇到await,上面的task开始运行
    print(已完成任务{}.format(task)) #经过上面的运行,task里面已经有result执行结果
    print(return value: {}.format(return_value))

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
asyncio_future_create_task.py

运行结果

[root@ mnt]# python3 asyncio_future_create_task.py 
创建任务
等待task的结果 <Task pending coro=<task_func() running at asyncio_future_create_task.py:11>>
task_func 执行完成
已完成任务<Task finished coro=<task_func() done, defined at asyncio_future_create_task.py:11> result=task_func返回值ok>
return value: task_func返回值ok

 12、asyncio取消任务运行

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def task_func():
    print(task_func 执行完成)
    return task_func返回值ok

async def main(loop):
    print(创建任务)
    task = loop.create_task(task_func())
    print(取消任务)
    task.cancel()
    print(取消任务结果 {}.format(task))
    try:
        await task #直到遇到await,上面的task开始运行
    except asyncio.CancelledError:
        print(从已取消的任务中捕获错误)
    else:
        print(任务执行结果 {}.format(task))

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
asyncio_future_create_cancel_task.py

 运行效果

[root@python-mysql mnt]# python3 asyncio_future_create_cancel_task.py 
创建任务
取消任务
取消任务结果 <Task cancelling coro=<task_func() running at asyncio_future_create_cancel_task.py:11>>
从已取消的任务中捕获错误

 13、利用回调取消任务执行

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def task_func():
    print(task_func睡眠)
    try:
        await asyncio.sleep(1)
    except asyncio.CancelledError:
        print(task_func 任务取消)
        raise
    return task_func返回值ok

def task_canceller(task_obj):
    print(task_canceller运行)
    task_obj.cancel()
    print(取消task_obj任务)

async def main(loop):
    print(创建任务)
    task = loop.create_task(task_func())
    loop.call_soon(task_canceller, task)
    try:
        await task  # 直到遇到await,上面的task开始运行
    except asyncio.CancelledError:
        print(从已取消的任务中捕获错误)
    else:
        print(任务执行结果 {}.format(task))

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
asyncio_future_create_callback_cancel_task.py

 运行效果

[root@ mnt]# python3 asyncio_future_create_callback_cancel_task.py 
创建任务
task_func睡眠
task_canceller运行
取消task_obj任务
task_func 任务取消
从已取消的任务中捕获错误

 14、asyncio.ensure_future(),增加函数,直到await才运行

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def wrapped():
    print(wrapped 运行)
    return wrapped result

async def inner(task):
    print(inner: 开始运行)
    print(inner: task {!r}.format(task))
    result = await task
    print(inner: task 返回值 {!r}.format(result))

async def start_task():
    print(开始创建task)
    task = asyncio.ensure_future(wrapped())
    print(等待inner运行)
    await inner(task)
    print(starter: inner returned)

event_loop = asyncio.get_event_loop()
try:
    print(进程事件循环)
    result = event_loop.run_until_complete(start_task())
finally:
    event_loop.close()
asyncio_ensure_future.py

运行效果

[root@ mnt]# python3 asyncio_ensure_future.py 
进程事件循环
开始创建task
等待inner运行
inner: 开始运行
inner: task <Task pending coro=<wrapped() running at asyncio_ensure_future.py:11>>
wrapped 运行
inner: task 返回值 wrapped result

 15、asyncio.wait()批量等待多个协程直到运行完成,包装多个返回显示结果

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def phase(i):
    print(phase形参传入值{}.format(i))
    await asyncio.sleep(0.1 * i)
    print(完成phase的次数{}.format(i))
    return phase {} result.format(i)

async def main(num_phase):
    print(main函数开始)
    phases = [
        phase(i) for i in range(num_phase)
    ]
    print(等待phases里面的多个函数执行完成)
    completed, pending = await asyncio.wait(phases)  # completed : 运行完成存在这里 ,pending : 没有运行完成存在这里
    for_completed_results = [t.result() for t in completed]
    print(for_completed_results : {}.format(for_completed_results))

event_loop = asyncio.get_event_loop()
try:
    print(进程事件循环)
    result = event_loop.run_until_complete(main(3))
finally:
    event_loop.close()
asyncio_wait.py

运行效果

[root@ mnt]# python3 asyncio_wait.py 
进程事件循环
main函数开始
等待phases里面的多个函数执行完成
phase形参传入值2
phase形参传入值0
phase形参传入值1
完成phase的次数0
完成phase的次数1
完成phase的次数2
for_completed_results : [phase 2 result, phase 0 result, phase 1 result]

  16、asyncio.wait()批量等待多个协程设置超时时间并且取消未完成的任务,包装多个返回显示结果

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def phase(i):
    print(phase形参传入值{}.format(i))
    try:
        await asyncio.sleep(0.1 * i)
    except asyncio.CancelledError:
        print(phase {} 取消.format(i))
    else:
        print(完成phase的次数{}.format(i))
        return phase {} result.format(i)

async def main(num_phase):
    print(main函数开始)
    phases = [
        phase(i) for i in range(num_phase)
    ]
    print(等待phases里面的多个函数执行完成)
    completed, pending = await asyncio.wait(phases, timeout=0.1)  # completed : 运行完成存在这里 ,pending : 没有运行完成存在这里

    print(completed长度:{},pending长度 :{}.format(len(completed), len(pending)))

    if pending:
        print(取消未完成的任务)
        for t in pending:
            t.cancel()
    print(main函数执行完成)


event_loop = asyncio.get_event_loop()
try:
    print(进程事件循环)
    result = event_loop.run_until_complete(main(3))
finally:
    event_loop.close()
asyncio_wait_timeout.py

运行效果

[root@ mnt]# python3 asyncio_wait_timeout.py 
进程事件循环
main函数开始
等待phases里面的多个函数执行完成
phase形参传入值1
phase形参传入值2
phase形参传入值0
完成phase的次数0
completed长度:1,pending长度 :2
取消未完成的任务
main函数执行完成
phase 1 取消
phase 2 取消

 17、asyncio.gather()多个协程运行,函数返回值接收

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def phase1():
    print(phase1运行中)
    await asyncio.sleep(2)
    print(phase1运行完成)
    return phase1 result

async def phase2():
    print(phase2运行中)
    await asyncio.sleep(1)
    print(phase2运行完成)
    return phase2 result

async def main():
    print(main函数开始)
    results = await asyncio.gather(
        phase1(),
        phase2()
    )
    print(results : {}.format(results))

event_loop = asyncio.get_event_loop()
try:
    print(进程事件循环)
    result = event_loop.run_until_complete(main())
finally:
    event_loop.close()
asyncio_gather.py

运行效果

[root@ mnt]# python3 asyncio_gather.py 
进程事件循环
main函数开始
phase1运行中
phase2运行中
phase2运行完成
phase1运行完成
results : [phase1 result, phase2 result]

  18、asyncio.as_completed()多个协程运行,函数返回值不是有序的接收

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def phase(i):
    print(phase {} 运行中.format(i))
    await asyncio.sleep(0.5 - (0.1 * i))
    print(phase {} 运行完成.format(i))
    return phase {} result.format(i)

async def main(num_phases):
    print(main函数开始)
    phases = [
        phase(i) for i in range(num_phases)
    ]
    print(等待phases运行完成)
    results = []
    for next_to_complete in asyncio.as_completed(phases):
        task_result = await next_to_complete
        print(接到到task_result : {}.format(task_result))
        results.append(task_result)
    print(results : {}.format(results))
    return results

event_loop = asyncio.get_event_loop()
try:
    print(进程事件循环)
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()
asyncio_as_completed.py

运行效果

[root@ mnt]# python3 asyncio_as_completed.py 
进程事件循环
main函数开始
等待phases运行完成
phase 2 运行中
phase 1 运行中
phase 0 运行中
phase 2 运行完成
接到到task_result : phase 2 result
phase 1 运行完成
接到到task_result : phase 1 result
phase 0 运行完成
接到到task_result : phase 0 result
results : [phase 2 result, phase 1 result, phase 0 result]

   19、asyncio.Lock() 协程锁的打开与关闭

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import functools

def unlock(lock):
    print(回调释放锁)
    lock.release()

async def coro1(lock):
    """with方式获得锁"""
    async with lock:
        print(coro1 打开锁运算)
    print(coro1 锁已释放)

async def coro2(lock):
    """传统方式获取锁"""
    await lock.acquire()
    try:
        print(coro2 打开锁运算)
    finally:
        print(coro2 锁已释放)
        lock.release()

async def main(loop):
    lock = asyncio.Lock()
    print(启动协程之前获取锁)
    await lock.acquire()
    print(获得锁 {}.format(lock.locked()))

    # 运行完成,回调解锁
    loop.call_later(0.1, functools.partial(unlock, lock))

    print(等待协程运行)
    await asyncio.wait([coro1(lock), coro2(lock)])


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
asyncio_lock.py

运行效果

[root@ mnt]# python3 asyncio_lock.py 
启动协程之前获取锁
获得锁 True
等待协程运行
回调释放锁
coro2 打开锁运算
coro2 锁已释放
coro1 打开锁运算
coro1 锁已释放

 20、asyncio.Event() 事件的查看与设置

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import functools

def set_event(event):
    print(回调设置event)
    event.set()

async def coro1(event):
    print(coro1 等待事件)
    await event.wait()
    print(coro1 触发运行)

async def coro2(event):
    print(coro2 等待事件)
    await event.wait()
    print(coro2 触发运行)

async def main(loop):
    event = asyncio.Event()
    print(event开始之前状态:{}.format(event.is_set()))
    loop.call_later(0.1, functools.partial(set_event, event))  # 延时0.1秒后,回调set_event函数
    await asyncio.wait([coro1(event), coro2(event)])
    print(event开始之后状态:{}.format(event.is_set()))

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
asyncio_event

运行效果

[root@ mnt]# python3 asyncio_event.py 
event开始之前状态:False
coro1 等待事件
coro2 等待事件
回调设置event
coro1 触发运行
coro2 触发运行
event开始之后状态:True

 21、asyncio.Condition(),对事件状态单独通知执行

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def consumer(condition_obj, i):
    async with condition_obj:
        print(消费者 {} 等待中.format(i))
        await condition_obj.wait()
        print(消费者 {} 触发.format(i))
    print(消费者 {} 消费结束.format(i))

async def manipulate_condition(condition_obj):
    print(开始操作condition)
    await asyncio.sleep(0.1)
    for i in range(3):
        async with condition_obj:
            print(通知消费者 {}.format(i))
            condition_obj.notify(i)
        await asyncio.sleep(0.1)

    async with condition_obj:
        print(通知其它所有的消费者)
        condition_obj.notify_all()
    print(操作condition结束)

async def main(loop):
    # 创建一个操作状态的对象
    condition_obj = asyncio.Condition()

    # 运5个消费者函数
    consumers = [
        consumer(condition_obj, i) for i in range(5)
    ]

    # 创建一个操作状态的任务
    loop.create_task(manipulate_condition(condition_obj))

    # 等待consumers所有的函数执行完成
    await asyncio.wait(consumers)

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
asyncio_condition.py

运行效果

[root@ mnt]# python3 asyncio_condition.py 
开始操作condition
消费者 2 等待中
消费者 3 等待中
消费者 0 等待中
消费者 4 等待中
消费者 1 等待中
通知消费者 0
通知消费者 1
消费者 2 触发
消费者 2 消费结束
通知消费者 2
消费者 3 触发
消费者 3 消费结束
消费者 0 触发
消费者 0 消费结束
通知其它所有的消费者
操作condition结束
消费者 4 触发
消费者 4 消费结束
消费者 1 触发
消费者 1 消费结束

  22、协程队列Queue,生产者与消费者的示例

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def consumer(n, q):
    print(消费者 {} 开始.format(n))
    while True:
        print(消费费 {} 等待消费.format(n))
        item = await q.get()
        print(消费者 {} 消费了 {}.format(n, item))
        if item is None:
            q.task_done()
            break
        else:
            await asyncio.sleep(0.01 * item)
            q.task_done()
    print(消费者 {} 结束.format(n))

async def producer(q, num_worker):
    print(生产者 开始)

    for i in range(num_worker * 3):
        await q.put(i)
        print(生产者 增加数据 {} 到队列中.format(i))

    print(生产者 增加停止信号到队列中)
    for i in range(num_worker):
        await q.put(None)
    print(生产者 等待队列清空)
    await q.join()
    print(生产者 结束)

async def main(loop, num_consumers):
    # 创建一个队列,最大的长度是num_consumers
    q = asyncio.Queue(maxsize=num_consumers)

    consumers = [
        loop.create_task(consumer(i, q)) for i in range(num_consumers)
    ]

    producer_task = loop.create_task(producer(q, num_consumers))

    await asyncio.wait(consumers + [producer_task])

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop, 2))
finally:
    event_loop.close()
asyncio_queue.py

运行效果

[root@ mnt]# python3 asyncio_queue.py 
消费者 0 开始
消费费 0 等待消费
消费者 1 开始
消费费 1 等待消费
生产者 开始
生产者 增加数据 0 到队列中
生产者 增加数据 1 到队列中
消费者 0 消费了 0
消费者 1 消费了 1
生产者 增加数据 2 到队列中
生产者 增加数据 3 到队列中
消费费 0 等待消费
消费者 0 消费了 2
生产者 增加数据 4 到队列中
消费费 1 等待消费
消费者 1 消费了 3
生产者 增加数据 5 到队列中
生产者 增加停止信号到队列中
消费费 0 等待消费
消费者 0 消费了 4
消费费 1 等待消费
消费者 1 消费了 5
生产者 等待队列清空
消费费 0 等待消费
消费者 0 消费了 None
消费者 0 结束
消费费 1 等待消费
消费者 1 消费了 None
消费者 1 结束
生产者 结束

 23、利用 asyncio.Protocol 实现服务端和客户端数据相互传送

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import logging
import sys

SERVER_ADDRESS = [localhost, 8000]

class EchoServer(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info(peername)
        self.log = logging.getLogger(
            EchoServer_{}_{}.format(*self.address)
        )
        self.log.debug(接收连接)

    def data_received(self, data):
        self.log.debug(接收数据 {}.format(data))
        self.transport.write(data)
        self.log.debug(发送数据 {}.format(data))

    def eof_received(self):
        self.log.debug(接收数据 EOF)
        if self.transport.can_write_eof():
            self.transport.write_eof()

    def connection_lost(self, exc):
        if exc:
            self.log.error(错误 {}.format(exc))
        else:
            self.log.debug(服务关闭)
        super(EchoServer, self).connection_lost(exc)

logging.basicConfig(
    level=logging.DEBUG,
    format=%(name)s : %(message)s,
    stream=sys.stderr
)

log = logging.getLogger(main)

event_loop = asyncio.get_event_loop()

factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS)

server = event_loop.run_until_complete(factory)

log.debug(开始运行 IP:{} Port:{}.format(*SERVER_ADDRESS))

try:
    event_loop.run_forever()
finally:
    log.debug(关闭服务)
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug(关闭事件循环)
    event_loop.close()
asyncio_echo_server_protocol.py
技术图片
#!/usr/bin/env python3
# encoding: utf-8

import asyncio
import functools
import logging
import sys

MESSAGES = [
    bThis is the message. ,
    bIt will be sent ,
    bin parts.,
]
SERVER_ADDRESS = (localhost, 8000)

class EchoClient(asyncio.Protocol):

    def __init__(self, messages, future):
        super().__init__()
        self.messages = messages
        self.log = logging.getLogger(EchoClient)
        self.f = future

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info(peername)
        self.log.debug(
            连接服务器IP:{} port :{}.format(*self.address)
        )

        for msg in self.messages:
            transport.write(msg)
            self.log.debug(发送数据 {!r}.format(msg))
        if transport.can_write_eof():
            transport.write_eof()

    def data_received(self, data):
        self.log.debug(received {!r}.format(data))

    def eof_received(self):
        self.log.debug(接收到 EOF)
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)

    def connection_lost(self, exc):
        self.log.debug(服务器关闭连接)
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)
        super().connection_lost(exc)

#设置日志级别
logging.basicConfig(
    level=logging.DEBUG,
    format=%(name)s: %(message)s,
    stream=sys.stderr,
)

#打印日志标题
log = logging.getLogger(main)

#创建一个事件循环
event_loop = asyncio.get_event_loop()

#创建客户端的Future
client_completed = asyncio.Future()

#利用偏函数自动传参给EchoClient实例化类
client_factory = functools.partial(
    EchoClient,
    messages=MESSAGES,
    future=client_completed,
)

#创建一个事件循环连接
factory_coroutine = event_loop.create_connection(
    client_factory,
    *SERVER_ADDRESS,
)

log.debug(等待客户端运行完成)
try:
    event_loop.run_until_complete(factory_coroutine)
    event_loop.run_until_complete(client_completed)
finally:
    log.debug(关闭事件循环)
    event_loop.close()
asyncio_echo_client_protocol.py

运行效果

服务端

[root@ mnt]# python3 asyncio_echo_server_protocol.py 
asyncio : Using selector: EpollSelector
main : 开始运行 IP:localhost Port:8000
EchoServer_::1_54082 : 接收连接
EchoServer_::1_54082 : 接收数据 bThis is the message. It will be sent in parts.
EchoServer_::1_54082 : 发送数据 bThis is the message. It will be sent in parts.
EchoServer_::1_54082 : 接收数据 EOF
EchoServer_::1_54082 : 服务关闭

客户端

[root@ mnt]# python3 asyncio_echo_client_protocol.py 
asyncio: Using selector: EpollSelector
main: 等待客户端运行完成
EchoClient: 连接服务器IP:::1 port :8000
EchoClient: 发送数据 bThis is the message. 
EchoClient: 发送数据 bIt will be sent 
EchoClient: 发送数据 bin parts.
EchoClient: received bThis is the message. It will be sent in parts.
EchoClient: 接收到 EOF
EchoClient: 服务器关闭连接
main: 关闭事件循环

  24、基于Coroutine 实现服务端和客户端数据相互传送,与22点示例功能一样)

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import logging
import sys

SERVER_ADDRESS = (localhost, 8000)

async def echo(reader, writer):
    address = writer.get_extra_info(peername)
    log = logging.getLogger(echo_{}_{}.format(*address))
    log.debug(开始接受连接)
    while True:
        data = await reader.read(128)
        if data:
            log.debug(接受的数据 : {}.format(data))
            writer.write(data)
            await writer.drain()
            log.debug(发送数据:{}.format(data))
        else:
            log.debug(关闭连接)
            writer.close()
            return

logging.basicConfig(
    level=logging.DEBUG,
    format=%(name)s : %(message)s,
    stream=sys.stderr
)

log = logging.getLogger(main)

event_loop = asyncio.get_event_loop()

factory = asyncio.start_server(echo, *SERVER_ADDRESS)
server = event_loop.run_until_complete(factory)
log.debug(开始启动服务 IP:{},Port:{}.format(*SERVER_ADDRESS))

try:
    event_loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    log.debug(关闭服务端)
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug(关闭事件循环)
    event_loop.close()
asyncio_echo_server_coroutine.py
技术图片
#!/usr/bin/env python3
# encoding: utf-8

import asyncio
import logging
import sys

MESSAGES = [
    bThis is the message. ,
    bIt will be sent ,
    bin parts.,
]
SERVER_ADDRESS = (localhost, 8000)

async def echo_client(address, messages):
    log = logging.getLogger(echo_client)

    log.debug(连接服务器 to {} port {}.format(*address))

    # 创建与服务端连接
    reader, writer = await asyncio.open_connection(*address)

    for msg in messages:
        writer.write(msg)
        log.debug(发送数据: {}.format(msg))

    # 判断是否发送结束标记
    if writer.can_write_eof():
        writer.write_eof()
    # 等待所有发送完成
    await writer.drain()

    log.debug(等待服务器响应)
    while True:
        data = await reader.read(128)
        if data:
            log.debug(接收服务器数据 :{}.format(data))
        else:
            log.debug(关闭与服务器的连接)
            writer.close()
            return

logging.basicConfig(
    level=logging.DEBUG,
    format=%(name)s: %(message)s,
    stream=sys.stderr,
)
log = logging.getLogger(main)

event_loop = asyncio.get_event_loop()

try:
    event_loop.run_until_complete(
        echo_client(SERVER_ADDRESS, MESSAGES)
    )
finally:
    log.debug(closing event loop)
    event_loop.close()
asyncio_echo_client_coroutine.py

运行效果

服务端

[root@ mnt]# python3 asyncio_echo_server_coroutine.py 
asyncio : Using selector: EpollSelector
main : 开始启动服务 IP:localhost,Port:8000
echo_::1_54084 : 开始接受连接
echo_::1_54084 : 接受的数据 : bThis is the message. It will be sent in parts.
echo_::1_54084 : 发送数据:bThis is the message. It will be sent in parts.
echo_::1_54084 : 关闭连接
#这里使用Ctrl + C,运行后面的功能
^Cmain : 关闭服务端
main : 关闭事件循环

客户端

[root@ mnt]# python3 asyncio_echo_client_coroutine.py 
asyncio: Using selector: EpollSelector
echo_client: 连接服务器 to localhost port 8000
echo_client: 发送数据: bThis is the message. 
echo_client: 发送数据: bIt will be sent 
echo_client: 发送数据: bin parts.
echo_client: 等待服务器响应
echo_client: 接收服务器数据 :bThis is the message. It will be sent in parts.
echo_client: 关闭与服务器的连接
main: closing event loop

 25、基于Coroutine ,实现SSL的Socket通讯

#创建ssl证书

openssl req -newkey rsa:2048 -nodes -keyout test_ssl.key -x509 -days 800 -out test_ssl.crt
技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import logging
import sys
import ssl

SERVER_ADDRESS = (localhost, 8000)

async def echo(reader, writer):
    address = writer.get_extra_info(peername)
    log = logging.getLogger(echo_{}_{}.format(*address))
    log.debug(开始接受连接)
    while True:
        data = await reader.read(128)

        #因为ssl不支持EOF结束,所以需要用‘x00‘结束
        terminate = data.endswith(bx00)
        data = data.rstrip(bx00)
        if data:
            log.debug(接受的数据 : {}.format(data))
            writer.write(data)
            await writer.drain()
            log.debug(发送数据:{}.format(data))
        if not data or terminate:
            log.debug(关闭连接)
            writer.close()
            return

logging.basicConfig(
    level=logging.DEBUG,
    format=%(name)s : %(message)s,
    stream=sys.stderr
)

log = logging.getLogger(main)

event_loop = asyncio.get_event_loop()

# 创建SSL所需要的对象
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.check_hostname = False
ssl_context.load_cert_chain(test_ssl.crt, test_ssl.key)

factory = asyncio.start_server(echo, *SERVER_ADDRESS, ssl=ssl_context)
server = event_loop.run_until_complete(factory)
log.debug(开始启动服务 IP:{},Port:{}.format(*SERVER_ADDRESS))

try:
    event_loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    log.debug(关闭服务端)
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug(关闭事件循环)
    event_loop.close()
asyncio_echo_server_ssl.py
技术图片
#!/usr/bin/env python3
# encoding: utf-8

import asyncio
import logging
import sys
import ssl

MESSAGES = [
    bThis is the message. ,
    bIt will be sent ,
    bin parts.,
]
SERVER_ADDRESS = (localhost, 8000)

async def echo_client(address, messages):
    log = logging.getLogger(echo_client)

    # 客户端ssl所需要带证书访问
    ssl_context = ssl.create_default_context(
        ssl.Purpose.SERVER_AUTH
    )
    ssl_context.check_hostname = False
    ssl_context.load_verify_locations(test_ssl.crt)

    log.debug(连接服务器 to {} port {}.format(*address))

    # 创建与服务端连接
    reader, writer = await asyncio.open_connection(*address, ssl=ssl_context)

    for msg in messages:
        writer.write(msg)
        log.debug(发送数据: {}.format(msg))

    # 判断是否发送结束标记
    # 非ssl
    # if writer.can_write_eof():
    #     writer.write_eof()

    # ssl
    writer.write(bx00)

    # 等待所有发送完成
    await writer.drain()

    log.debug(等待服务器响应)
    while True:
        data = await reader.read(128)
        if data:
            log.debug(接收服务器数据 :{}.format(data))
        else:
            log.debug(关闭与服务器的连接)
            writer.close()
            return

logging.basicConfig(
    level=logging.DEBUG,
    format=%(name)s: %(message)s,
    stream=sys.stderr,
)
log = logging.getLogger(main)

event_loop = asyncio.get_event_loop()

try:
    event_loop.run_until_complete(
        echo_client(SERVER_ADDRESS, MESSAGES)
    )
finally:
    log.debug(closing event loop)
    event_loop.close()
asyncio_echo_client_ssl.py

运行效果

服务端

[root@ mnt]# python3 asyncio_echo_server_ssl.py 
asyncio : Using selector: EpollSelector
main : 开始启动服务 IP:localhost,Port:8000
echo_::1_54094 : 开始接受连接
echo_::1_54094 : 接受的数据 : bThis is the message. It will be sent in parts.
echo_::1_54094 : 发送数据:bThis is the message. It will be sent in parts.
echo_::1_54094 : 关闭连接
^Cmain : 关闭服务端
main : 关闭事件循环

客户端

[root@ mnt]# python3 asyncio_echo_client_ssl.py 
asyncio: Using selector: EpollSelector
echo_client: 连接服务器 to localhost port 8000
echo_client: 发送数据: bThis is the message. 
echo_client: 发送数据: bIt will be sent 
echo_client: 发送数据: bin parts.
echo_client: 等待服务器响应
echo_client: 接收服务器数据 :bThis is the message. It will be sent in parts.
echo_client: 关闭与服务器的连接
main: closing event loop

 26、利用asyncio.SubprocessProtocol类继承的方式实现子进程的调用

技术图片
#!/usr/bin/env python3
# encoding: utf-8

#end_pymotw_header
import asyncio
import functools

class DFProtocol(asyncio.SubprocessProtocol):

    FD_NAMES = [stdin, stdout, stderr]

    def __init__(self, done_future):
        self.done = done_future
        self.buffer = bytearray()
        super().__init__()

    def connection_made(self, transport):
        print(process started {}.format(transport.get_pid()))
        self.transport = transport

    def pipe_data_received(self, fd, data):
        print(read {} bytes from {}.format(len(data),
                                             self.FD_NAMES[fd]))
        if fd == 1:
            self.buffer.extend(data)

    def process_exited(self):
        print(process exited)
        return_code = self.transport.get_returncode()
        print(return code {}.format(return_code))
        if not return_code:
            cmd_output = bytes(self.buffer).decode()
            results = self._parse_results(cmd_output)
        else:
            results = []
        self.done.set_result((return_code, results))

    def _parse_results(self, output):
        print(parsing results)
        if not output:
            return []
        lines = output.splitlines()
        headers = lines[0].split()
        devices = lines[1:]
        results = [
            dict(zip(headers, line.split()))
            for line in devices
        ]
        return results


async def run_df(loop):
    print(in run_df)

    cmd_done = asyncio.Future(loop=loop)
    factory = functools.partial(DFProtocol, cmd_done)
    proc = loop.subprocess_exec(
        factory,
        df, -hl,
        stdin=None,
        stderr=None,
    )
    try:
        print(launching process)
        transport, protocol = await proc
        print(waiting for process to complete)
        await cmd_done
    finally:
        transport.close()

    return cmd_done.result()


event_loop = asyncio.get_event_loop()
try:
    return_code, results = event_loop.run_until_complete(
        run_df(event_loop)
    )
finally:
    event_loop.close()

if return_code:
    print(error exit {}.format(return_code))
else:
    print(
Free space:)
    for r in results:
        print({Mounted:25}: {Avail}.format(**r))
asyncio_subprocess_protocol.py

运行效果

由于我使用的Python版本是3.6.6,调用的优先级是
1、调用    def connection_made(self, transport) 函数
2、调用    def process_exited(self)函数
3、调用   def pipe_data_received(self, fd, data)函数这里是输出结果,所以结束进程的时候process_exited解析是空,导致结果出不来,这里待pyhton版本验证

 27、利用协程子进程的调用

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

def _parse_results(output):
    print(解析结果)
    if not output:
        return []
    lines = output.splitlines()
    headers = lines[0].split()
    devices = lines[1:]
    results = [
        dict(zip(headers, line.split())) for line in devices
    ]
    return results

async def run_df():
    print(run_df函数运行)
    buffer = bytearray()
    create = asyncio.create_subprocess_exec(
        df, -h,
        stdout=asyncio.subprocess.PIPE
    )

    print(df -h开始运行)
    proc = await create
    print(进程开始 {}.format(proc.pid))

    while True:
        line = await proc.stdout.readline()
        print(读取 : {}.format(line))
        if not line:
            print(命令不再输出)
            break
        buffer.extend(line)
    print(等待进程运行完成)
    await proc.wait()
    return_code = proc.returncode
    print(运行返回码:{}.format(return_code))
    if not return_code:
        cmd_output = bytes(buffer).decode()
        results = _parse_results(cmd_output)
    else:
        results = []

    return (return_code, results)

event_loop = asyncio.get_event_loop()
try:
    return_code, results = event_loop.run_until_complete(
        run_df()
    )
finally:
    event_loop.close()

if return_code:
    print(错误退出,错误信息:{}.format(return_code))
else:
    print(运行结果:)
    for r in results:
        print({Mounted:25}:{Avail}.format(**r))
asyncio_subprocess_coroutine.py

运行效果

[root@ mnt]# python3 asyncio_subprocess_coroutine.py 
run_df函数运行
df -h开始运行
进程开始 44244
读取 : bFilesystem               Size  Used Avail Use% Mounted on

读取 : b/dev/mapper/centos-root   17G  7.9G  9.2G  47% /

读取 : bdevtmpfs                 478M     0  478M   0% /dev

读取 : btmpfs                    489M     0  489M   0% /dev/shm

读取 : btmpfs                    489M   56M  433M  12% /run

读取 : btmpfs                    489M     0  489M   0% /sys/fs/cgroup

读取 : b/dev/sda1               1014M  125M  890M  13% /boot

读取 : btmpfs                     98M     0   98M   0% /run/user/0

读取 : b‘‘
命令不再输出
等待进程运行完成
运行返回码:0
解析结果
运行结果:
/                        :9.2G
/dev                     :478M
/dev/shm                 :489M
/run                     :433M
/sys/fs/cgroup           :489M
/boot                    :890M
/run/user/0              :98M

  28、利用协程管道传数据给子进程的调用处理

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio

async def to_upper(input):
    print(进程转大写的to_upper函数)

    create = asyncio.create_subprocess_exec(
        tr, [:lower:], [:upper:],
        stdout=asyncio.subprocess.PIPE,
        stdin=asyncio.subprocess.PIPE,
    )
    print(等待子进程运行完成)
    proc = await create
    print(子进程PID {}.format(proc.pid))

    print(查看子进程运行的标准输出和错误)
    stdout, stderr = await proc.communicate(input.encode())

    print(等待子进程完成)
    await proc.wait()

    return_code = proc.returncode
    print(return code {}.format(return_code))
    if not return_code:
        results = bytes(stdout).decode()
    else:
        results = ‘‘
    return (return_code, results)

MESSAGE = """
This message will be converted
to all caps.
"""

event_loop = asyncio.get_event_loop()
try:
    return_code, results = event_loop.run_until_complete(
        to_upper(MESSAGE)
    )
finally:
    event_loop.close()

if return_code:
    print(错误时,退出的返回状态码 {}.format(return_code))
else:
    print(源数据: {!r}.format(MESSAGE))
    print(处理过的数据 : {!r}.format(results))
asyncio_subprocess_coroutine_write.py

运行效果

[root@ mnt]# python3 asyncio_subprocess_coroutine_write.py 
进程转大写的to_upper函数
等待子进程运行完成
子进程PID 78254
查看子进程运行的标准输出和错误
等待子进程完成
return code 0
源数据: 
This message will be converted
to all caps.

处理过的数据 : 
THIS MESSAGE WILL BE CONVERTED
TO ALL CAPS.

   29、协程之信号的注册处理

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import functools
import os
import signal

def signal_handler(name):
    print(正在处理信号 : {}.format(name))

event_loop = asyncio.get_event_loop()

# 给信号绑定处理的事件
event_loop.add_signal_handler(
    signal.SIGHUP,
    functools.partial(signal_handler, name=SIGHUP),
)
event_loop.add_signal_handler(
    signal.SIGUSR1,
    functools.partial(signal_handler, name=SIGUSR1),
)
event_loop.add_signal_handler(
    signal.SIGINT,
    functools.partial(signal_handler, name=SIGINT),
)

async def send_signals():
    pid = os.getpid()
    print(开始发送信号给PID:{}.format(pid))

    for name in [SIGHUP, SIGHUP, SIGUSR1, SIGINT]:
        print(发送信号名字:{}.format(name))
        # 跟linux 命令kill一样,利用pid结束进程
        os.kill(pid, getattr(signal, name))
        print(放弃控制)
        await asyncio.sleep(0.01)
    return

try:
    event_loop.run_until_complete(send_signals())
finally:
    event_loop.close()
asyncio_signal.py

运行效果

[root@ mnt]# python3 asyncio_signal.py 
开始发送信号给PID:78320
发送信号名字:SIGHUP
放弃控制
正在处理信号 : SIGHUP
发送信号名字:SIGHUP
放弃控制
正在处理信号 : SIGHUP
发送信号名字:SIGUSR1
放弃控制
正在处理信号 : SIGUSR1
发送信号名字:SIGINT
放弃控制
正在处理信号 : SIGINT

    29、协程与线程结合(ThreadPoolExecutor

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import logging
import sys
import concurrent.futures
import time

def blocks(n):
    log = logging.getLogger(blocks({}).format(n))
    log.info(运行)
    time.sleep(0.1)
    log.info(done)
    return n ** 2

async def run_blocking_tasks(executor):
    """运行阻塞的任务"""
    log = logging.getLogger(run_blocking_tasks)
    log.info(开始运行)
    log.info(创建执行任务)
    loop = asyncio.get_event_loop()
    blocking_tasks = [
        loop.run_in_executor(executor, blocks, i) for i in range(6)
    ]
    log.info(等待执行任务)
    completed, pending = await asyncio.wait(blocking_tasks)
    results = [t.result() for t in completed]
    log.info(运行结果: {!r}.format(results))

    log.info(exitrun_blocking_tasks 退出)

if __name__ == __main__:
    logging.basicConfig(
        level=logging.INFO,
        format=%(threadName)10s %(name)18s: %(message)s,
        stream=sys.stderr
    )

    # 创建一个线程池执行器,最大开启3个工作线程
    executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=3
    )

    # 创建一个事件循环
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(run_blocking_tasks(executor))
    finally:
        event_loop.close()
asyncio_ThreadPoolExecutor.py

运行效果

[root@ mnt]# python3 asyncio_ThreadPoolExecutor.py 
MainThread run_blocking_tasks: 开始运行
MainThread run_blocking_tasks: 创建执行任务
ThreadPoolExecutor-0_0          blocks(0): 运行
ThreadPoolExecutor-0_1          blocks(1): 运行
ThreadPoolExecutor-0_2          blocks(2): 运行
MainThread run_blocking_tasks: 等待执行任务
ThreadPoolExecutor-0_0          blocks(0): done
ThreadPoolExecutor-0_0          blocks(3): 运行
ThreadPoolExecutor-0_1          blocks(1): done
ThreadPoolExecutor-0_1          blocks(4): 运行
ThreadPoolExecutor-0_2          blocks(2): done
ThreadPoolExecutor-0_2          blocks(5): 运行
ThreadPoolExecutor-0_1          blocks(4): done
ThreadPoolExecutor-0_0          blocks(3): done
ThreadPoolExecutor-0_2          blocks(5): done
MainThread run_blocking_tasks: 运行结果: [0, 9, 16, 25, 1, 4]
MainThread run_blocking_tasks: exitrun_blocking_tasks 退出

     30、协程与进程结合(ProcessPoolExecutor)

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import logging
import sys
import concurrent.futures
import time

def blocks(n):
    log = logging.getLogger(blocks({}).format(n))
    log.info(运行)
    time.sleep(0.1)
    log.info(done)
    return n ** 2

async def run_blocking_tasks(executor):
    """运行阻塞的任务"""
    log = logging.getLogger(run_blocking_tasks)
    log.info(开始运行)
    log.info(创建执行任务)
    loop = asyncio.get_event_loop()
    blocking_tasks = [
        loop.run_in_executor(executor, blocks, i) for i in range(6)
    ]
    log.info(等待执行任务)
    completed, pending = await asyncio.wait(blocking_tasks)
    results = [t.result() for t in completed]
    log.info(运行结果: {!r}.format(results))

    log.info(exitrun_blocking_tasks 退出)

if __name__ == __main__:
    logging.basicConfig(
        level=logging.INFO,
        format=PID %(process)5s %(name)18s: %(message)s,
        stream=sys.stderr
    )

    # 创建一个线程池执行器,最大开启3个工作线程
    executor = concurrent.futures.ProcessPoolExecutor(
        max_workers=3
    )

    # 创建一个事件循环
    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(run_blocking_tasks(executor))
    finally:
        event_loop.close()
asyncio_ProcessPoolExecutor.py

运行效果

[root@mnt]# python3 asyncio_ProcessPoolExecutor.py 
PID 91883 run_blocking_tasks: 开始运行
PID 91883 run_blocking_tasks: 创建执行任务
PID 91883 run_blocking_tasks: 等待执行任务
PID 91884          blocks(0): 运行
PID 91885          blocks(1): 运行
PID 91886          blocks(2): 运行
PID 91884          blocks(0): done
PID 91884          blocks(3): 运行
PID 91886          blocks(2): done
PID 91885          blocks(1): done
PID 91886          blocks(4): 运行
PID 91885          blocks(5): 运行
PID 91884          blocks(3): done
PID 91886          blocks(4): done
PID 91885          blocks(5): done
PID 91883 run_blocking_tasks: 运行结果: [25, 1, 4, 9, 0, 16]
PID 91883 run_blocking_tasks: exitrun_blocking_tasks 退出

 31、asyncio调试模式的开启

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
import asyncio
import logging
import sys
import time
import warnings

#接收命令行
parser = argparse.ArgumentParser(‘debugging asyncio‘)
parser.add_argument(
    ‘-v‘,
    dest=‘verbose‘,
    default=False,
    action=‘store_true‘,
)
args = parser.parse_args()

#设置日志级别
logging.basicConfig(
    level=logging.DEBUG,
    format=‘%(levelname)7s: %(message)s‘,
    stream=sys.stderr,
)
LOG = logging.getLogger(‘‘)

async def inner():
    LOG.info(‘inner 函数开始‘)
    time.sleep(0.1)
    LOG.info(‘inner 运行完成‘)

async def outer(loop):
    LOG.info(‘outer 函数开始‘)
    #ensure_future,直到await才运行
    await asyncio.ensure_future(loop.create_task(inner()))
    LOG.info(‘outer 运行完成‘)

event_loop = asyncio.get_event_loop()
if args.verbose:
    LOG.info(‘开启DEBUG模式‘)
    event_loop.set_debug(True)

    # 使“慢”任务的阈值非常小以便于说明。默认值为0.1,即100毫秒。
    event_loop.slow_callback_duration = 0.001

    # 在警告过滤器列表中插入一个简单的条目(在前面)。
    warnings.simplefilter(‘always‘, ResourceWarning)

LOG.info(‘entering event loop‘)
event_loop.run_until_complete(outer(event_loop))
asyncio_debug.py

运行效果

#开启Debug
[root@ mnt]# python3 asyncio_debug.py 
  DEBUG: Using selector: EpollSelector
   INFO: entering event loop
   INFO: outer 函数开始
   INFO: inner 函数开始
   INFO: inner 运行完成
   INFO: outer 运行完成
[root@python-mysql mnt]# python3 asyncio_debug.py -v
  DEBUG: Using selector: EpollSelector
   INFO: 开启DEBUG模式
   INFO: entering event loop
   INFO: outer 函数开始
WARNING: Executing <Task pending coro=<outer() running at asyncio_debug.py:42> wait_for=<Task pending coro=<inner() running at asyncio_debug.py:33> cb=[<TaskWakeupMethWrapper object at 0x7f882e06c0d8>()] created at asyncio_debug.py:42> cb=[_run_until_complete_cb() at /usr/local/Python-3.6.6/lib/python3.6/asyncio/base_events.py:177] created at /usr/local/Python-3.6.6/lib/python3.6/asyncio/base_events.py:447> took 0.003 seconds
   INFO: inner 函数开始
   INFO: inner 运行完成
WARNING: Executing <Task finished coro=<inner() done, defined at asyncio_debug.py:33> result=None created at asyncio_debug.py:42> took 0.101 seconds
   INFO: outer 运行完成

#正常运行
[root@ mnt]# python3 asyncio_debug.py 
  DEBUG: Using selector: EpollSelector
   INFO: entering event loop
   INFO: outer 函数开始
   INFO: inner 函数开始
   INFO: inner 运行完成
   INFO: outer 运行完成
You have new mail in /var/spool/mail/root

 32、利用生成器的方式,创建协程socket监听

技术图片
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import logging
import sys

@asyncio.coroutine
def echo(reader, writer):
    address = writer.get_extra_info(peername)
    log = logging.getLogger(echo_{}_{}.format(*address))
    log.debug(connection accepted)
    while True:
        data = yield from reader.read(128)
        if data:
            log.debug(received {!r}.format(data))
            writer.write(data)
            yield from writer.drain()
            log.debug(sent {!r}.format(data))
        else:
            log.debug(closing)
            writer.close()
            return

#开启Debug模式
logging.basicConfig(
    level=logging.DEBUG,
    format=%(name)s: %(message)s,
    stream=sys.stderr,
)
#设置日志的title
log = logging.getLogger(main)

#设置开启服务的IP+端口
server_address = (localhost, 8888)

#获取事件循环
event_loop = asyncio.get_event_loop()

# 创建服务器,让循环在之前完成协同工作。并且启动实际事件循环
coroutine = asyncio.start_server(echo, *server_address,loop=event_loop)
server = event_loop.run_until_complete(coroutine)
log.debug(starting up on {} port {}.format(*server_address))

try:
    #开启一直循环处理任务
    event_loop.run_forever()
finally:
    #结束后清理的工作
    log.debug(closing server)
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug(closing event loop)
    event_loop.close()
asyncio_echo_server_generator

运行效果

[root@ mnt]# python3 asyncio_echo_server_generator 
asyncio: Using selector: EpollSelector
main: starting up on localhost port 8888

 33、协程的关闭示例

技术图片
import asyncio
import logging
import sys

logging.basicConfig(
    level=logging.DEBUG,
    format=%(name)s: %(message)s,
    stream=sys.stderr,
)
LOG = logging.getLogger(main)


async def stopper(loop):
    LOG.debug(stopper invoked)
    loop.stop()


event_loop = asyncio.get_event_loop()

event_loop.create_task(stopper(event_loop))

try:
    LOG.debug(entering event loop)
    event_loop.run_forever()
finally:
    LOG.debug(closing event loop)
    event_loop.close()
asyncio_stop.py

运行效果

[root@ mnt]# python3 asyncio_stop.py 
asyncio: Using selector: EpollSelector
main: entering event loop
main: stopper invoked
main: closing event loop

 

以上是关于Python之asyncio模块的使用的主要内容,如果未能解决你的问题,请参考以下文章

[未解决问题记录]python asyncio+aiohttp出现Exception ignored:RuntimeError('Event loop is closed')(代码片段

asyncio创建协程解析——分析廖雪峰的Python教程之创建WEB服务(转)

python:asyncio模块

Python asyncio 模块

asyncio:Python异步编程模块

如何在asyncio python中使用子进程模块限制并发进程数