Python 异步 io 流

Posted

技术标签:

【中文标题】Python 异步 io 流【英文标题】:Python async io stream 【发布时间】:2018-10-02 17:39:18 【问题描述】:

我正在阅读 asyncio 文档中的以下代码。

import asyncio

async def tcp_echo_client(message):
  reader, writer = await asyncio.open_connection(
    '127.0.0.1', 8888)

  print(f'Send: message!r')
  writer.write(message.encode())

  data = await reader.read(100)
  print(f'Received: data.decode()!r')

  print('Close the connection')
  writer.close()
  await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

但是我现在能够理解为什么 reader.read 是可等待的但 writer.write 不是?既然它们都是 I/O 操作,写方法也应该是可等待的吧?

【问题讨论】:

【参考方案1】:

但是我现在能够理解为什么 reader.read 是可等待的但 writer.write 不是?既然都是 I/O 操作,write 方法也应该是 awaitable 吧?

不一定。 read()write() 之间的基本不对称是 read() 必须返回实际数据,而 write() 纯粹是通过副作用操作。所以read() 必须是可等待的,因为它需要在数据不可用时暂停调用协程。另一方面,write() 可以(并且在 asyncio 中)通过将数据存储在某个缓冲区中并安排在适当的时间将其写入来实现。

这种设计具有重要的后果,例如写入数据的速度比另一方读取数据的速度快导致缓冲区无限膨胀,并且write() 期间的异常实际上会丢失。这两个问题都可以通过调用writer.drain() 来解决,该writer.drain() 应用背压,即将缓冲区写出到操作系统,如有必要,暂停进程中的协程。这样做直到缓冲区大小低于threshold。 write() 文档建议“对 write() 的调用应该跟在 drain() 之后。”

write() 中缺少背压是因为异步流是在基于回调的层之上实现的,其中非异步 write() 比完全异步的替代方案更方便使用。有关该主题的详细处理,请参阅Nathaniel J Smith 的Nathaniel J Smith 的article。

【讨论】:

注意:Python 3.8 将有 awrite()aclose() 异步流方法以避免 write() / drain()close() / wait_closed() 混淆。 @AndrewSvetlov 太棒了!拥有一个 flush() 协程也非常有用,它是 file.flush 的 asyncio 等价物。这个想法是简单地将流缓冲区刷新到操作系统,就像 drain() 但没有水印。 我怀疑flush() 是否有用,因为刷新对分布式系统没有交付保证。 Peer 仍然无法接收 flushed 数据。文件系统不同:刷新的缓冲区位于内核内存中,如果不重新启动机器(非常不可能),它们将保存在磁盘上。 fsync 提供了更强大的保证。无论如何,如果你仍然想要flush() -- 请在 bugs.python.org 上创建一个问题以供讨论 @AndrewSvetlov 没错,flush() 不提供交付保证,但将数据传输到操作系统仍然有用。这与刷新常规文件相同,其中数据不会同步而是传递给内核。套接字也有一个内核缓冲区,Nathaniel 对此进行了讨论,他继续认为 asyncio 在内核缓冲区之上的水印系统是多余的,并导致缓冲区膨胀。 (我搜索了 async-sig 档案,但找不到对该主张的反驳。)但由于 flush()drain() 正交,我将只提交 BPO 问题。 我曾经在 aiohttp 服务器中禁用水印。结果,我得到了 10-15% 的 IIRC 减速。

以上是关于Python 异步 io 流的主要内容,如果未能解决你的问题,请参考以下文章

Python开发第九篇:协程异步IO

python 异步IO数据库队列缓存

Python异步IO之协程:使用asyncio的不同方法实现协程

Flink之基于Vertx的Mysql异步IO

事件 vs 流 vs Observables vs 异步迭代器

Python学习---IO的异步[自定义异步IO]