asyncio并发编程

Posted zzliu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了asyncio并发编程相关的知识,希望对你有一定的参考价值。


asyncio并发编程

asyncio是Python3.4引入的一个用于异步IO的库,其主要功能如下

1)包含各种特定系统实现的模块化事件循环

2)传输和协议抽象

3)对TCP、UDP、SSL、子进程、延时调用以及其他的具体支持

4)模仿futures模块但适用于事件循环使用的Future类

5)基于yield from的协议和任务,可以让我们用顺序的方式编写并发代码

6)必须使用一个将产生阻塞IO的调用时,有接口可以把这个事件转移到线程池

7)模仿threading模块中的同步原语、可以用在单线程内的协程之间


事件循环

基本使用

来看一个示例

# 异步编程三要素:事件循环+回调(驱动生成器)+IO多路复用
# asyncio是python用于解决异步IO编程的一整套解决方案
# 基于asyncio的框架:tornado、gevent、twisted(scrapy,django channels)
# tornado(实现了web服务器),django+flask


# 使用asyncio
import asyncio
import time


async def get_html(url):
    print("start get url")
    # 注意这里一定不要使用time.sleep(1),因为time.sleep()是同步阻塞的
    await asyncio.sleep(2)  # 耗时操作一定要放到await里面
    # time.sleep(2)
    print("end get url")


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = [get_html("http://www.baidu.com") for _ in range(10)]
    # loop.run_until_complete(get_html("http://www.baidu.com"))
    loop.run_until_complete(asyncio.wait(tasks))    # 一次执行多个任务
    print(time.time()-start_time)


获取协程的返回值

import asyncio
import time


async def get_html(url):
    print("start get url")
    # 注意这里一定不要使用time.sleep(1),因为time.sleep()是同步阻塞的
    await asyncio.sleep(2)
    # time.sleep(2)
    print("end get url")
    return "loop_test"


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # ensure_future返回的是一个future对象
    get_future = asyncio.ensure_future(get_html("http://www.baidu.com"))
    loop.run_until_complete(get_future)     # run_until_complete也可以接收一个future对象
    print(get_future.result())  # loop_test



还有一种方式就是通过task对象获取返回值

import asyncio
import time


async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    print("end get url")
    return "loop_test"


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create_task(get_html("http://www.baidu.com"))
    loop.run_until_complete(task)
    print(get_future.result())  # loop_test


回调

如果在执行完协程后需要回调,比如说我们需要发送邮件,可以使用task对象的add_done_callback方法

import asyncio
import time


async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    print("end get url")
    return "loop_test"


def callback(future):   # 注意这里默认会将future传给回调函数,所以必须有一个参数接收
    print("send email to robin")


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create_task(get_html("http://www.baidu.com"))
    task.add_done_callback(callback)
    loop.run_until_complete(task)
    print(task.result())



这里有一个问题,如果回调函数需要参数的话就不行了,因为我们在调用的时候只传入了一个函数名,要传入参数,可以使用functool的partial函数

import time
from functools import partial


async def get_html(url):
    print("start get url")
    # 注意这里一定不要使用time.sleep(1),因为time.sleep()是同步阻塞的
    await asyncio.sleep(2)
    print("end get url")
    return "loop_test"


def callback(name, future):   # 注意这里callback自己的参数放在前面,future放在后面
    print("send email to %s" % name)


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task = loop.create_task(get_html("http://www.baidu.com"))
    task.add_done_callback(partial(callback, "sansa"))  # 将callback函数包裹起来
    loop.run_until_complete(task)
    print(task.result())


wait和gather

wait用于一次提交多个任务,gather与task在使用上基本相似,区别在于gather传参时需要使用*打散

下面来具体说一下详细区别

1)gather更加high-level(抽象层次更高)

2)gather可以分组


import asyncio
import time
from functools import partial


async def get_html(url):
    print("start get url")
    # 注意这里一定不要使用time.sleep(1),因为time.sleep()是同步阻塞的
    await asyncio.sleep(2)
    print("end get url")
    return "loop_test"


def callback(name, future):   # 注意这里默认会将future传给回调函数,所以必须有一个参数接收
    print("send email to %s" % name)


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task1 = [get_html("http://www.baidu.com") for _ in range(5)]
    task2 = [get_html("http://www.baidu.com") for _ in range(5)]
    loop.run_until_complete(asyncio.gather(*task1, *task2))     # 这里需要打散


后面可以改写成

if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    task1 = [get_html("http://www.baidu.com") for _ in range(5)]
    task2 = [get_html("http://www.baidu.com") for _ in range(5)]
    group1 = asyncio.gather(*task1)
    group2 = asyncio.gather(*task2)
    loop.run_until_complete(asyncio.gather(group1, group2))


task取消和子协程调用原理

task取消

import asyncio
import time


async def get_html(sleep_times):
    print("waiting")
    await asyncio.sleep(sleep_times)
    print("done after %s s" % sleep_times)


if __name__ == '__main__':
    task1 = get_html(2)
    task2 = get_html(3)
    task3 = get_html(3)

    tasks = [task1, task2, task3]
    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:  # 命令行运行时按ctrl+c终止程序
        all_tasks = asyncio.Task.all_tasks()    # 获取所有task
        for task in all_tasks:
            print("cancel task")
            print(task.cancel())    # 取消成功返回True
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()    # 注意close与stop的区别


子协程

下面来看一个官方示例

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()



执行结果

Compute 1 + 2 ...
# 1 s
1 + 2 = 3

上述协程的时序图如下

技术图片


其他方法

call_soon

call_soon是立即启动的意思,传入一个函数和函数需要的参数

import asyncio

def callback(sleep_times):
    print("sleep %s seconds" % sleep_times)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.call_soon(callback, 2) 
    loop.run_forever()



执行结果

sleep 2 seconds
# 程序继续运行中...



我们需要在协程结束后终止事件循环,因此需要再定义一个函数

import asyncio

def callback(sleep_times):
    print("sleep %s seconds" % sleep_times)

def stoploop(loop):     # 终止循环
    loop.stop()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.call_soon(callback, 2)     # 第一个参数是函数名,后面为动态参数
    loop.call_soon(stoploop, loop)
    loop.run_forever()


call_later

call_later按等待时间执行函数,会根据传入的时间排出一个顺序

import asyncio


def callback(sleep_times):
    print("sleep %s seconds" % sleep_times)


def stoploop(loop):
    loop.stop()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.call_later(1, callback, 1)     # 第一个参数是函数名,后面为动态参数
    loop.call_later(3, callback, 3)     # 第一个参数是函数名,后面为动态参数
    loop.call_later(2, callback, 2)     # 第一个参数是函数名,后面为动态参数
    # loop.call_soon(stoploop, loop)    # 这里如果使用call_soon那么loop会先stop,就看不到效果了
    loop.run_forever()



执行结果

sleep 1 seconds
sleep 2 seconds
sleep 3 seconds
# 程序继续运行中...


call_at

call_at在指定时间执行

import asyncio

def callback(sleep_times):
    print("sleep %s seconds" % sleep_times)

def stoploop(loop):
    loop.stop()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    now = loop.time()
    loop.call_at(now+2, callback, 1)     # 第一个参数是函数名,后面为动态参数
    loop.call_at(now+3, callback, 3)     # 第一个参数是函数名,后面为动态参数
    loop.call_at(now+2, callback, 2)     # 第一个参数是函数名,后面为动态参数
    loop.run_forever()


call_soon_threadsafe

call_soon_threadsafe是线程安全的call_soon,涉及到多线程时,使用这个


ThreadPollExecutor 和 asyncio完成阻塞io请求

什么时候使用多线程:在协程中集成阻塞io

在协程中不要放阻塞的代码, 但如果非要使用阻塞的代码, 就可以放到线程池中运行。

import asyncio
from concurrent.futures import ThreadPoolExecutor
import socket
from urllib.parse import urlparse
import time


def get_url(url):
    # 通过socket请求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    # 建立socket连接
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect((host, 80))

    client.send("GET  HTTP/1.1\r\nHost:\r\nConnection:close\r\n\r\n".format(path, host).encode('utf8'))
    data = b""
    while True:
        d = client.recv(1024)
        if d:
            data += d
        else:
            break
    data = data.decode("utf8")
    html_data = data.split("\r\n\r\n")[1]   # 把请求头信息去掉,只要网页内容
    print(html_data)
    client.close()


if __name__ == '__main__':
    start_time = time.time()
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor(3)    # 线程池
    tasks = []
    for url in range(20):
        url = "http://shop.projectsedu.com/goods//".format(url)
        # 阻塞的代码放到线程池
        task = loop.run_in_executor(executor, get_url, url)
        tasks.append(task)
    loop.run_until_complete(asyncio.wait(tasks))
    print("last time: %s" %(time.time()-start_time))


asyncio发送http请求

asyncio发送http请求可以通过asyncio的open_connection方法实现,open_connection方法返回reader和writer对象,分别用于读和写

import asyncio
from urllib.parse import urlparse
import time


async def get_url(url):
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = '/'

    # 建立socket连接
    reader, writer = await asyncio.open_connection(host, 80)  # 协程 与服务端建立连接
    writer.write(
        "GET  HTTP/1.1\r\nHost:\r\nConnection:close\r\n\r\n".format(path, host).encode('utf8'))
    all_lines = []
    async for raw_line in reader:  # __aiter__ __anext__魔法方法
        line = raw_line.decode('utf8')
        all_lines.append(line)
    html = '\n'.join(all_lines)
    return html


if __name__ == '__main__':
    import time
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = []
    for url in range(20):
        url = "http://shop.projectsedu.com/goods//".format(url)
        tasks.append(get_url(url))
    loop.run_until_complete(asyncio.wait(tasks))
    print("last time: %s" % (time.time() - start_time))



如果我们需要获取协程执行后的结果,我们可以把future对象放入tasks里面,然后通过future获取result

修改后半段代码

if __name__ == '__main__':
    import time
    start_time = time.time()
    loop = asyncio.get_event_loop()
    tasks = []
    for url in range(20):
        url = "http://shop.projectsedu.com/goods//".format(url)
        tasks.append(asyncio.ensure_future(get_url(url)))   # future对象
    loop.run_until_complete(asyncio.wait(tasks))
    for task in tasks:
        print(task.result())    # 通过future对象获取结果
    print("last time: %s" % (time.time() - start_time))


上面的是所有协程都执行完后再获取结果,如果需要执行完一个马上获取结果,可以使用as_completed方法

import asyncio
from urllib.parse import urlparse
import time


async def get_url(url):
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = '/'

    # 建立socket连接
    reader, writer = await asyncio.open_connection(host, 80)  # 协程 与服务端建立连接
    writer.write(
        "GET  HTTP/1.1\r\nHost:\r\nConnection:close\r\n\r\n".format(path, host).encode('utf8'))
    all_lines = []
    async for raw_line in reader:  # __aiter__ __anext__魔法方法
        line = raw_line.decode('utf8')
        all_lines.append(line)
    html = '\n'.join(all_lines)
    return html


async def main():
    tasks = []
    for url in range(20):
        url = "http://shop.projectsedu.com/goods//".format(url)
        tasks.append(asyncio.ensure_future(get_url(url)))
    for task in asyncio.as_completed(tasks):
        result = await task     # 一定要加await
        print(result)


if __name__ == '__main__':
    import time
    start_time = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print("last time: %s" % (time.time() - start_time))


asyncio同步和通信

同步

协程一般是不需要锁的

import asyncio
 
total = 0 
 
async def add():
    global total
    for _ in range(1000000):
        total += 1
 
 
async def desc():
    global total, lock
    for _ in range(1000000):
        total -= 1 

 
if __name__ == '__main__':
    tasks = [add(), desc()]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    print(total)



上面的代码是不需要加锁的,不管运行多少次,结果都为0。但在某些情况下,我们需要加锁来使协程同步

import asyncio
import aiohttp
from asyncio import Lock


cache = "baidu": "http://ww.baidu.com"
lock = Lock()   # 这里的Lock不是系统的Lock,还有async for 。。。类似的用法


async def get_stuff(url):
    async with lock:    # 等价于with await lock
        # 这里可以使用async with 是因为 Lock中有__await__ 和 __aenter__两个魔法方法
        # 和线程一样, 这里也可以用 await lock.acquire() 并在结束时 lock.release
        if url in cache:
            return cache[url]
        stuff = await aiohttp.request("GET", url)
        cache[url] = stuff
        return stuff


async def parse_stuff(url):
    stuff = await get_stuff(url)
    # do some parse


async def use_stuff(url):
    stuff = await get_stuff(url)
    # use stuff to do something


if __name__ == '__main__':
    tasks = [parse_stuff("baidu"), use_stuff("baidu")]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))


这里parse_stuff和use_stuff有共同调用的代码get_stuff,parse_stuff去请求的时候,如果get_stuff也去请求,可能触发网站的反爬机制


通信

因为协程是单线程的,所以协程完全可以使用全局变量实现queue来相互通信,但是如果想要在queue中定义存放的最大数目,那么需要使用asyncio的Queue,同时使用get和put时需要加上await

from asyncio import Queue
queue = Queue(maxsize=5)    

以上是关于asyncio并发编程的主要内容,如果未能解决你的问题,请参考以下文章

asyncio--python3未来并发编程主流充满野心的模块

python异步编程之asyncio(百万并发)

Python并发编程之初识异步IO框架:asyncio 上篇

Python并发编程之学习异步IO框架:asyncio 中篇

asyncio 并发编程

gj13 asyncio并发编程