《asyncio 系列》8. 在 asyncio 中通过流(StreamReaderStreamWriter)来实现 TCP 请求的发送与接收
Posted 来自东方地灵殿的小提琴手
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《asyncio 系列》8. 在 asyncio 中通过流(StreamReaderStreamWriter)来实现 TCP 请求的发送与接收相关的知识,希望对你有一定的参考价值。
楔子
在编写网络应用程序时,我们使用了 socket 库来读取和写入客户端。虽然在构建低级网络库时直接使用套接字很有效,但用法上还是有些复杂,例如启动服务端、等待客户端连接以及向客户端发送数据等等。asyncio 的设计者意识到这一点,并构建了网络流 API,这些更高级的 API 比套接字更容易使用,利用这些 API 创建的任何客户端、服务端应用程序,比我们自己使用套接字更方便且更健壮。而流是在 asyncio 中构建基于网络的应用程序的推荐方法。
什么是流
在 asyncio 中,流是一组高级的类和函数,用于创建、管理网络连接和通用数报流。使用它们,我们可以创建客户端连接来读取和写入数据,也可以创建服务端并自己管理它们。这些 API 抽象了很多关于管理套接字的方法,例如处理 SSL 或丢失的连接,极大地减轻了开发人员的工作负担。
流 API 构建在称为传输和协议的一组较低级别的 API 之上,这些 API 直接包装了我们在前几章中使用的套接字,并提供了一个简单的方式来读取套接字数据以及将数据写入套接字。
这些 API 的结构与其他 API 稍有不同,因为它们使用回调样式设计。与之前所做的那些主动等待套接字数据不同,当数据可用时,我们会调用的实例上的某个方法,然后根据需要来处理在此方法中收到的数据。下面就来学习这些基于回调的 API 是如何工作的,让我们首先看看如何通过构建一个基本的 HTTP 客户端来使用较低级的传输和协议。
传输和协议
在高层次上,传输是与任意数据流进行通信的抽象,与套接字或任何数据流(如标准输入)通信时,我们将使用一组熟悉的操作。从数据源读取数据或向目标写入数据,当我们完成对它的处理时,将关闭相应的数据源。而套接字完全符合我们定义的这种传输抽象的方式,也就是说,读取和写入数据,一旦完成,就关闭它。简而言之,传输提供了向源发送数据和从源接收数据的定义。传输有多种实现,具体取决于我们使用的源的类型,我们主要关注 ReadTransport、WriteTransport 和 Transport,还有其他一些用于处理 UDP 连接和子进程通信的实现。
在套接字之间来回传输数据只是这个过程的一部分,那么套接字的生命周期是怎样的呢?我们建立连接,写入数据,然后处理得到的任何响应,这些是协议拥有的一组操作。注意,这里的协议只指一个 Python 类,而不是 HTTP 或 FTP 之类的协议。传输可以管理数据的传递,并在事件发生时调用协议上的方法,例如建立连接或准备处理数据。
为了解传输和协议如何协同工作,我们将构建一个基本应用程序来运行单个 HTTP GET 请求。我们需要做的第一件事是定义一个继承 asyncio.Protocol 的类,并覆盖父类的一些方法来发出请求、从请求中接收数据,并处理连接中的任何错误。
需要实现的第一个协议方法是 connection_made,当底层套接字与 HTTP 服务器成功连接时,传输将调用此方法。此方法使用 Transport 作为参数,我们可以使用它与服务器通信。这种情况下,将使用传输立即发送HTTP 请求。
需要实现的第二个方法是 data_received,传输在接收数据时调用此方法,并将其作为字节传递给我们。这个方法可以被多次调用,所以需要创建一个内部缓冲区来存储数据。
import asyncio
from asyncio import Transport, AbstractEventLoop
from typing import Optional
class HTTPGetClientProtocol(asyncio.Protocol):
def __init__(self, host: str, loop: AbstractEventLoop):
self._host = host
self._future = loop.create_future()
self._transport: Optional[Transport] = None
self._response_buffer: bytes = b""
async def get_response(self):
# 等待 self._future,直到从服务器得到响应并写入 self._future
return await self._future
def _get_request_bytes(self) -> bytes:
# 创建 HTTP 请求
request = ("GET / HTTP/1.1\\r\\n"
"Connection: close\\r\\n"
f"Host: self._host\\r\\n\\r\\n")
return request.encode("utf-8")
def connection_made(self, transport: Transport) -> None:
"""底层套接字和服务器端建立连接时会调用此方法"""
print(f"和 self._host 建立连接")
# 会自动传入一个 transport 参数,它就是传输,我们用它来管理数据
# 并在事件发生时调用协议上的方法,比如这里的 connection_made,我们将传输保存起来
self._transport = transport
# 调用传输的 write 方法写入数据
self._transport.write(self._get_request_bytes())
def data_received(self, data: bytes) -> None:
"""传输在收到数据时会调用协议的 data_received 方法"""
print("收到数据")
self._response_buffer += data
def eof_received(self) -> Optional[bool]:
"""
如果服务端已经将所有数据都返回完毕,那么会关闭连接
此时传输会自动调用协议的 eof_received 方法
"""
print("数据全部接收完毕")
# 响应数据都接收完毕,将其写入 future 中
self._future.set_result(self._response_buffer)
# 该方法返回一个布尔值,用于确定如何关闭传输(底层套接字)
# 返回 False 则让传输自行关闭,返回 True 意味着需要编写协议来关闭
# 由于当前不需要在关闭时执行什么特殊逻辑,所以返回 False 即可
# 因此我们不需要手动处理关闭传输
return False
def connection_lost(self, exc: Optional[Exception]) -> None:
"""当连接关闭时会调用此方法"""
# 如果连接正常关闭,则什么也不做
if exc is None:
print("连接正常关闭")
else:
# 否则将异常设置到 future 里面
self._future.set_exception(exc)
async def make_request(host: str, port: int, loop: AbstractEventLoop):
# 协议工厂,调用之后创建一个协议实例
def protocol_factory():
return HTTPGetClientProtocol(host, loop)
# create_connection 将创建到给定主机的套接字连接,并将其包装在适当的传输中
# 当建立连接之后,会自动调用协议的 connection_made,在该方法中会向目的主机发送请求
# 当数据达到时,会自动协议的 data_received,数据返回完毕时自动调用协议的 eof_received
transport, protocol = await loop.create_connection(protocol_factory, host=host, port=port)
# 将数据写入 future 之后,调用 get_response 得到响应数据
# 在 create_connection 里面我们传入了一个协议工厂,在里面会自动调用
# 返回的 transport 就是传输,protocol 就是内部的创建协议实例,但传输这里我们不需要
return await protocol.get_response()
async def main():
loop = asyncio.get_running_loop()
result = await make_request("www.baidu.com", 80, loop)
print("百度一下".encode("utf-8") in result)
asyncio.run(main())
"""
和 www.baidu.com 建立连接
收到数据
收到数据
数据全部接收完毕
True
连接正常关闭
"""
我们已经学会了使用传输和协议,但这些 API 是较低级别的,因此不推荐。我们更建议使用流,这是一种扩展了传输和协议的更高级别的抽象。
流读取与流写人
传输和协议是较低级别的 API,最适合在发送和接收数据时直接控制所发生的事情。例如,如果正在设计一个网络库或 Web 框架,可能会考虑传输和协议。但对于大多数应用程序,我们不需要这种级别的控制,使用传输和协议将会编写一些重复的代码。
asyncio 的设计者意识到了这一点,并创建了更高级别的流 API,该 API 将传输和协议的标准用例封装成两个易于理解和使用的类:StreamReader 和 StreamWriter。顾名思义,它们分别处理对流的读取和写入,使用这些类是在 asyncio 中开发网络应用程序的推荐方法。
为帮助你了解如何使用这些 API,下面列举一个发出 HTTP GET 请求并将其转换为流的示例。asyncio 没有直接生成 StreamReader 和 StreamWriter 的实例,而是提供一个名为 open_connection 的库协程函数,它将创建这些实例。这个协程接收目的主机的地址和端口,并以元组形式返回 StreamReader 和 StreamWriter。我们的计划是使用 StreamWriter 发送 HTTP 请求,并使用 StreamReader 读取响应。StreamReader 方法很容易理解,我们有一个方便的 readline 协程方法,它会一直等到我们获得一行数据,或者也可以使用 SteamReader 的 read 协程方法等待指定数量的字节到达。
StreamWriter 稍微复杂一些,它有一个 write 方法,该方法是一个普通方法而不是协程。在内部,流写入器尝试立即写入套接字的输出缓冲区,但此缓冲区可能已满。如果套接字的写入缓冲区已满,则数据将存储在内部队列中,以后可以进入缓冲区。但这带来一个潜在问题,即调用 write 不一定会立即发送数据,这会导致什么后果呢?想象一下,网络连接变慢了,每秒只能发送 1KB,但应用程序每秒写入 1MB。这种情况下,应用程序的写缓冲区填满的速度,比把数据发送到套接字缓冲区的速度快得多,最终将达到机器内存的限制,并导致崩溃。
那怎么能等到所有数据都正确地发送出去呢?为解决这个问题,可使用一个叫做 drain 的协程方法。这个协程将阻塞(直到所有排队的数据被发送到套接字),确保我们在继续运行程序之前,已经写出所有内容。从技术角度看,不必在每次写入后都调用 drain,但这有助于防止错误发生。
import asyncio
from asyncio import StreamReader
from typing import AsyncGenerator
async def read_until_empty(stream_reader: StreamReader) -> AsyncGenerator[bytes, None]:
# 读取一行,直到没有任何剩余数据
while response := await stream_reader.readline():
yield response
async def main():
host = "www.baidu.com"
request = ("GET / HTTP/1.1\\r\\n"
"Connection: close\\r\\n"
f"Host: host\\r\\n\\r\\n")
stream_reader, stream_write = await asyncio.open_connection(host, 80)
try:
stream_write.write(request.encode("utf-8"))
await stream_write.drain()
response = b"".join([r async for r in read_until_empty(stream_reader)])
print("百度一下".encode("utf-8") in response)
finally:
# 关闭 writer
stream_write.close()
# 并等待它完成关闭
await stream_write.wait_closed()
asyncio.run(main())
"""
True
"""
我们首先创建了一个简单的异步生成器从 StreamReader 读取所有行,直到没有任何剩余的数据要处理。然后在主协程中,打开一个到 baidu.com 的连接,在这个过程中创建一个 StreamReader 和 StreamWriter 实例。然后分别使用 write 和 drain 写入请求。一旦完成了写入请求,将使用异步生成器从响应中获取每一行数据,将它们存储在响应列表中,最后通过调用 close 关闭 StreamWriter 实例,然后等待 wait_closed 协程。为什么需要在这里调用一个方法和一个协程?原因是当调用 close 时会执行一些动作,例如取消注册套接字和底层传输调用 connection_lost 方法,这些都是在事件循环的后续迭代中异步发生的。这意味着在调用 close 之后,连接不会马上关闭,而是直到稍后的某个时间才会关闭。如果你需要等待连接关闭才能继续操作,或者担心关闭时可能发生的任何异常,最好调用 wait_closed。
然后再来聊一聊 StreamReader,它有以下几个协程方法:
read(self, n=-1):如果 n 为 -1,那么会一直读到 EOF 并返回已读取的所有内容。如果 n 大于 0,则读取指定的字节数并返回,如果不够那么有多少读多少;
readexactly(self, n):读取 n 个字节,数据不够 n 个字节,则返回 IncompleteReadError;
readline(self):从流中读取数据块,直到找到换行符 b"\\\\n"。如果成功,那么返回带有换行符的数据块。如果在遇到换行符之前先遇到了 EOF(响应结束了),那么直接返回读到的行;
readuntil(self, separator=b\'\\n\'):读取数据,直到找到指定的分隔符 separator。readline 本质上也是调用了 readuntil,而且 readuntil 的 separator 的默认就是换行符,所以它默认等价于 readline,当然我们也可以指定为别的;
如果没有返回数据(直接读到了 EOF),那么这几个方法会返回空字节串(readexactly 特殊,字节不够会报错)。
现在通过发出 Web 请求了解了有关流 API 的基础知识,但这些类的用处超出了基于 Web 和网络的应用程序,接下来我们将了解如何利用流读取器来创建非阻塞命令行应用程序。
非阻塞命令行输入
一般情况下,在 Python 中要获取用户输入时,我们会使用 input 函数。该函数将会阻塞线程,直到用户提供输入并按下 Enter 键。但如果想在后合运行代码,同时保持对输入的响应呢?例如,我们可能想让用户同时启动多个长时间运行的任务,例如长时间运行的 SQL 查询。而对于命令行聊天应用程序,则可能希望用户能够在接收来自其他用户的消息时键入自己的消息。
由于 asyncio 是单线程的,在 asynio 应用程序中使用 input 意味着停止运行事件循环,直到用户提供输入内容,这将停止整个应用程序。即使使用任务在后台启动操作也行不通。为演示这一点,让我们尝试创建一个应用程序,用户输入应用程序的休眠时间。我们希望能够在接收用户输入的同时,一起运行多个这些休眠操作。
import asyncio
async def delay(seconds):
print(f"休眠 seconds 秒")
await asyncio.sleep(seconds)
print(f"seconds 秒休眠完成")
async def main():
while True:
delay_time = input("请输入休眠时间: ")
asyncio.create_task(delay(int(delay_time)))
asyncio.run(main())
"""
请输入休眠时间: 5
请输入休眠时间: 3
请输入休眠时间: 4
请输入休眠时间:
"""
这个问题原因应该很容易理解,input 会阻塞整个线程,所以任务永远不执行。
我们真正想要的是将 input 函数改为协程,可以编写类似 words = await input() 的代码。如果能做到这一点,任务将正确调度,并在等待用户输入时继续运行。不幸的是,input 没有协程变体,所以需要使用其他技术来实现。 而这正是协议和流读取器可以帮助我们的地方,回顾一下,流读取器有 readline 协程方法,这是我们正在寻找的协程类型。如果有办法将流读取器连接到标准输入,就可以使这个协程实现用户输入。
asyncio 在事件循环上有一个名为 connect_read_pipe 的协程方法,它将协议连接到类似文件的对象,这与我们预想的几乎相同。这个协程方法接收一个协议工厂(protocol factory)和一个管道(pipe),协议工厂只是一个创建协议实例的函数,管道(pipe)是一个类似文件的对象,它被定义为一个对象,上面有读写等方法。connect_read_pipe 协程将管道连接到工厂创建的协议,从管道中获取数据,并将其发送到协议。
就标准控制台输入而言,sys.stdin 符合传递给 connect_read_pipe 的类文件对象的要求。一旦调用了这个协程,就会得到一个工厂函数创建的协议和一个 ReadTransport。现在的问题是我们应该在工厂中创建什么协议,以及如何将它和具有我们想要使用的 readline 协程的 StreamReader 连接起来?
asyncio 提供了一个名为 StreamReaderProtocol 的实用程序类,用于将流读取器的实例连接到协议。当实例化这个类时,我们传入一个流读取器的实例,然后协议类委托给我们创建的流读取器,允许使用流读取器从标准输入中读取数据。将所有这些内容放在一起,可创建一个在等待用户输入时,不会阻塞事件循环的命令行应用程序。
import asyncio
from asyncio import StreamReader
import sys
async def create_stdin_reader() -> StreamReader:
stream_reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(stream_reader)
loop = asyncio.get_running_loop()
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
return stream_reader
在代码中,我们创建了一个名为 create_stdin_reader 的可重用协程,它创建了个 StreamReader,我们将使用它来异步读取标准输入。首先创建一个流读取器实例并将其传递给流读取器协议,然后调用 connect_read_pipe,将协议工厂作为 lambda 函数传入。这个 lambda 函数会自动调用,并返回我们之前创建的流读取器协议,然后通过 sys.stdin 将标准输入连接到流读取器协议。并且 connect_read_pipe 会返回传输和协议,但当前不需要它们,因此忽略了。现在可以使用此函数从标准输入异步读取,并构建应用程序。
import asyncio
from asyncio import StreamReader
import sys
async def create_stdin_reader() -> StreamReader:
stream_reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(stream_reader)
loop = asyncio.get_running_loop()
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
return stream_reader
async def delay(seconds):
print(f"休眠 seconds 秒")
await asyncio.sleep(seconds)
print(f"seconds 秒休眠完成")
async def main():
stdin_reader = await create_stdin_reader()
while True:
delay_time = await stdin_reader.readline()
asyncio.create_task(delay(int(delay_time)))
asyncio.run(main())
"""
10
休眠 10 秒
5
休眠 5 秒
1
休眠 1 秒
1 秒休眠完成
5 秒休眠完成
10 秒休眠完成
"""
在主协程中,调用 create_stdin_reader 并无限循环,等待来自具有 readline 协程的用户的输入。一旦用户在键盘上按下 Enter 键,这个协程就会传递输入的文本。当从用户那里得到输入的内容,就将它转换成一个整数并创建一个delay 任务。运行它,你将能在输入命令行的同时,运行多个 delay 任务。
但令人遗憾的是,在 Windows 系统上,connect_read_pipe 与 sys.stdin 不匹配。这是由于 Windows 实现文件描述符的方式导致的未修复错误,你可通过 https://bugs.python.org/issue26832 了解更多详细信息。
创建服务器
构建服务器时,我们创建了一个服务器套接字,将其绑定到一个端口并等待传入的连接。虽然这可行,但 asyncio 允许在更高的抽象级别上创建服务器,这意味着创建它们之后不用操心套接字的管理问题。以这种方式创建服务器简化了需要为使用套接字编写的代码,因此使用这些更高级别的 API 是使用 asyncio 创建和管理服务器的推荐方法。
可使用 asyncio.start_server 协程创建一个服务器,这个协程接收几个可选参数来配置诸如 SSL 的参数,但我们关注的主要参数是 host、port 和 client_connected_cb。host 和 port 就像我们之前看到的一样:服务器套接字监听的地址的端口,但有趣的部分是 client_connected_cb,它要么是一个回调函数,要么是一个在客户端连接到服务器时将运行的协程。此回调将 StreamReader 和 StreamWriter 作为参数,让我们可以读取和写入连接到服务器的客户端。
而 await start_server 时,它会返回一个 AbstractServer 对象,这是一个抽象类,调用它的 serve_forever 方法可以永远运行服务器,直到我们终止它。并且这个类也是一个异步上下文管理器,这意味着可使用带有 async with 语法的实例来让服务器在退出时正确关闭。
为了掌握如何创建服务器,让我们再次创建一个回显服务器,但要提供一些更高级的功能。除了回显输出,还将显示有多少其他客户端已连接到服务器,并且客户端和服务器断开连接时显示断开的客户端信息。
import asyncio
from asyncio import StreamReader, StreamWriter
import logging
class ServerState:
def __init__(self):
self._writers = []
async def add_client(self, reader: StreamReader, writer: StreamWriter):
"""添加客户端,并创建回显任务"""
self._writers.append(writer)
await self._on_connect(writer)
asyncio.create_task(self._echo(reader, writer))
async def _on_connect(self, writer: StreamWriter):
"""当有新连接时,告诉客户端有多少用户在线,并同时其他人有新用户上线"""
writer.write(f"欢迎, 当前在线人数有 len(self._writers) 人\\n".encode("utf-8"))
await writer.drain()
await self._notify_all("新用户上线\\n")
async def _echo(self, reader: StreamReader, writer: StreamWriter):
try:
while (data := await reader.readline()) != b"":
writer.write(data + b"~")
await writer.drain()
# 如果客户端断开连接,那么通知其他用户,有人断开连接
self._writers.remove(writer)
await self._notify_all(f"有人断开连接, 当前在线人数为 len(self._writers)")
except ConnectionError:
logging.info("客户端断开连接")
except Exception as e:
logging.error(f"出现异常: e")
self._writers.remove(writer)
async def _notify_all(self, message: str):
"""向所有其他用户发送消息的辅助方法, 如果发送失败, 将删除该用户"""
for writer in self._writers:
try:
writer.write(message.encode("utf-8"))
await writer.drain()
except ConnectionError as e:
logging.error("无法向客户端写入数据, 连接断开")
self._writers.remove(writer)
async def main():
server_state = ServerState()
async def client_connected(reader: StreamReader, writer: StreamWriter):
await server_state.add_client(reader, writer)
# 当客户端连接时,会调用 client_connected 协程函数,并自动传入 reader 和 writer
# 在里面我们执行 await server_state.add_client
server = await asyncio.start_server(client_connected, "localhost", 9999)
async with server:
await server.serve_forever()
asyncio.run(main())
当用户连接到服务器时,client_connected 回调会响应该用户的读取器和写入器,进而调用 ServerState 实例的 add_client 协程。在 add_client 协程中存储了 StreamWriter,因此我们可以向所有连接的客户端发送消息,并在客户端断开连接时将其删除。然后语用 _on_connect,它会向客户端发送一条消息,通知有多少其他用户已连接。在 _on_connect 中,还通知其他所有已连接的客户端有新用户连接。
小结
在本篇文章中,我们学习了以下内容:
- 使用较低级别的传输和协议 API 来构建一个简单的 HTTP 客户端,这些 API 是高级 asyncio stream API 的基础,不推荐用于一般用途;
- 使用 StreamReader 和 StreamWriter 类来构建网络应用程序,这些更高级别的 API 是在 asyncio 中使用流的推荐方法;
- 使用流来创建非阻塞命令行应用程序,这些应用程序可以在后台运行任务,并保持对用户输入的响应;
- 使用 start_server 协程创建服务器,这种方法是在 asyncio 中创建服务器的推荐方法,而不是直接使用套接字;
如果觉得文章对您有所帮助,可以请囊中羞涩的作者喝杯柠檬水,万分感谢,愿每一个来到这里的人都生活愉快,幸福美满。
微信赞赏
支付宝赞赏
asyncio
Asyncio初体验
Asyncio在Python中提供的API很复杂,其旨在替不同群体的人解决不同的问题,也正是由于这个原因,所以很难区分重点。
根据asyncio在Python中的特性,可以将其划分为两大主要部分:
1. 应用(最终用户)开发者,想要在应用开发中使用asyncio;
2. 框架开发者,制作框架或库以供应用开发者在他们的应用开发中使用。
在asyncio社区中大部分的问题基本都与这两个部分相关,例如,asyncio的官方文档更像是给框架开发者使用的,而非应用开发者。这导致应用开发者在阅读文档时很容易被其复杂性所震撼,在你使用它之前,你得看完全部的文档才行。
QuickStart
不需要关心官方文档的内容,要掌握asyncio库比想象中要容易。
PEP 492的作者、async Python的主要贡献者——Yury Selivanov——曾说过,asyncio的很多API都是给框架开发者提供的,应用开发人员需要掌握的只是所有API中的一小部分。
在本节我们将研究这些核心特性,并了解如何在Python中使用基于事件的编程,以此实现基本的事件循环。
要成为一个掌握asyncio的应用开发者,你需要知道的东西其实可以用一个小例子来展示。
import time
import asyncio
async def main():
print(f‘{time.ctime()} Hello‘)
await asyncio.sleep(1.0)
print(f‘{time.ctime()} Goodbye‘)
loop.stop() # 1
loop = asyncio.get_event_loop() # 2
loop.create_task(main()) # 3
loop.run_forever() # 4
pending = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*pending, return_exceptions=True) # 5
loop.run_until_complete(group) # 6
loop.close() # 7
λ python quickstart.py
Fri Sep 28 19:39:33 2018 Hello
Fri Sep 28 19:39:34 2018 Goodbye
- 通常用于信号控制,暂停循环,但循环可以重新启用不会消失;
- 在运行协程之前获得一个循环实例,只要是在单线程中,这个实例就是单例的;
- 只有调用这个方法,协程才会被执行,该调用返回的task对象可以用于获取任务状态、结果,或可通过task.cancel()取消任务;
- 实现循环运行的方法之一,会阻塞当前线程(通常是主线程);
- 通常习惯是在程序入口处执行loop.run_forever()方法,在收到进程信号时停止循环,然后收集那些还未完成的task,调用loop.run_until_complete()方法等待其执行完毕,但更多的是用这个方法收集协程任务,然后等待其执行完毕;
- 实现循环运行的方法之一,同样阻塞当前线程,其保持循环运行直到其上调度的协程完成;
- 通常在最后调用,必须在调用loop.stop()方法的基础上使用,会导致循环永久消失。
上述例子漏了一些东西,最重要的是如何运行阻塞函数,我们知道协程就是函数中使用了await
关键字进行切换,但在当前async def
还没获得广泛支持前,使用阻塞函数/库是不可避免的。
为此,asyncio提供了一个与concurrent.futures
包中的API类似的API,它提供了ThreadPoolExecutor
和ProcessPoolExecutor
,默认基于线程,但很容易用基于进程的替换,这之间有些特殊的地方要注意。
import time
import asyncio
async def main():
print(f‘{time.ctime()} Hello‘)
await asyncio.sleep(1.0)
print(f‘{time.ctime()} Goodbye‘)
loop.stop()
def blocking(): # 1
time.sleep(0.5) # 2
print(f‘{time.ctime()} Hello from a thread!‘)
loop = asyncio.get_event_loop()
loop. create_task(main())
loop.run_in_executor(None, blocking) # 3
loop.run_forever()
pending = asyncio.Task.all_tasks(loop=loop) # 4
group = asyncio.gather(*pending)
loop.run_until_complete(group)
loop.close()
λ python quickstart_exe.py
Fri Sep 28 20:21:21 2018 Hello
Fri Sep 28 20:21:22 2018 Hello from a thread!
Fri Sep 28 20:21:22 2018 Goodbye
- 这个函数调用了常规的sleep(),这会阻塞主线程并阻止loop运行,我们不能使这个函数变成协程,更糟糕的是不能在主线程运行loop时调用它,解决办法是用一个executor来运行它;
- 注意一点,这个sleep运行时间比协程中的sleep运行时间要短,后文再讨论如果长的话会发生什么;
- 该方法帮助我们在事件循环外用额外的线程或进程执行函数,这个方法的返回值是一个Future对象,意味着可以用await来切换它;
- 挂起的task中不包含前面的阻塞函数,并且这个方法只返回task对象,绝对不会返回Future对象。
好了,通过上面的学习,已经掌握了应用开发者对于asyncio库需要的最重要的部分,接下来将拓展知识并对API进行层次理解,这会让你更容易理解如何从文档中获取信息。
Asyncio之顶
从前面一节我们发现,应用开发者只需几个命令就可以使用asyncio,但不幸的是官方文档巨量的API和扁平化的显示格式,让我们很难分清哪些命令更通用,哪些命令是向框架开发者提供的。
框架开发者通过文档寻找钩子并连接到其框架中。本节我们从框架开发者的角度来看他们如何构建新的异步兼容库。
Level | Concept | Implementation |
---|---|---|
9 | Network: streams | StreamReader & StreamWriter |
8 | Network: TCP & UDP | Protocol |
7 | Network: transports | BaseTransport |
6 | tools | asyncio.Queue |
5 | subprocesses & threads | run_in_executor(), asyncio.subprocess |
4 | tasks | asyncio.Task |
3 | futures | asyncio.Future |
2 | event loop | BaseEventLoop |
1 | coroutines | async def & await |
上表中加粗字体对于应用开发者最重要,级别分为9级,1级最基础。
- 一级,考虑设计第三方框架的最低级别,但在
Curio
和Trio
中并不流行,它们只依赖于Python中的本地协程,不依赖asyncio库模块; - 二级,事件循环被分离出来,因此可以对其进行替换,
uvloop
实现了比标准库更快的循环; - 三四级,带来了Future和Task对象,Task是Future的子类,可将其简单地视作一个等级,一个Future对象表示某种正在进行的动作,其将通过事件循环的notification返回结果,而Task对象表示运行在事件循环上的协程,简单理解为Future是“循环感知”的,Task是“循环感知”+“协程感知”的,应用开发更多地使用Task,而框架开发者的使用比例要看代码细节;
- 五级,代表启动工具,并等待运行在单独的线程或进程上的工作;
- 六级,代表附加的异步感知工具,比如
asyncio.Queue
,提供与Queue
模块类似的API,但原版的get和put方法会阻塞线程,因此这里提供的队列通过增加wait
关键字以支持异步; - 七到九级,网络IO层级,对应用开发者来说Stream十分方便,Protocol提供比Stream更细粒度的API,所有能使用Stream的地方都能用Protocol代替,除非要创建一个定制传输协议的框架供他人使用,否则几乎用不到Transport。
小结
在QuickStart中,掌握了快速上手的几个API;现在,对整个asyncio有了清晰的层级划分。这里再对上述知识进行强调:
- 第一级,知道如何写async def函数,如何使用await关键字来调用执行其它协程;
- 第二级,知道如何与事件循环开关、交互;
- 第五级,知道如何在事件循环外运行阻塞程序,由于大多数第三方库都不支持异步,比如ORM库;
- 第六级,协程间数据共享使用asyncio.Queue;
- 第九级,Stream的API提供了最简单的方式来使用socket进行网络通信。
如果使用提供异步兼容的第三方库,如aiohttp
,那么就不用直接使用asyncio的网络层,但这会导致对第三方库的依赖。
随着Python的发展,asyncio库可能会提供更多的API,以上也只是大致地分了几个层级。
以上是关于《asyncio 系列》8. 在 asyncio 中通过流(StreamReaderStreamWriter)来实现 TCP 请求的发送与接收的主要内容,如果未能解决你的问题,请参考以下文章
《asyncio 系列》10. 在微服务中集成 asyncio,以及超时控制自动重试服务降级