为什么要显式调用asyncio.StreamWriter.drain?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为什么要显式调用asyncio.StreamWriter.drain?相关的知识,希望对你有一定的参考价值。
来自doc:https://docs.python.org/3/library/asyncio-stream.html#asyncio.StreamWriter.write
写(数据)
Write data to the stream. This method is not subject to flow control. Calls to write() should be followed by drain().
coroutine drain()
Wait until it is appropriate to resume writing to the stream. Example: writer.write(data) await writer.drain()
据我所知,
- 每次调用
drain
时你都需要调用write
。 - 如果不是,我猜,
write
将阻止循环线程
那为什么写不是自动调用它的协同程序?为什么一个人打电话给write
而不必排水?我可以想到两个案例
- 你想立即
write
和close
- 在消息完成之前,您必须缓冲一些数据。
第一个是特例,我想我们可以有一个不同的api。缓冲应该在写入功能内部处理,应用程序不应该在意。
让我以不同的方式提出问题。这样做有什么缺点? python3.8版本有效吗?
async def awrite(writer, data):
writer.write(data)
await writer.drain()
注意:drain
doc明确说明如下:
当没有什么可以等待时,drain()立即返回。
再次阅读答案和链接,我认为这些功能是这样的。注意:检查已接受的答案以获得更准确的版本
def write(data):
remaining = socket.try_write(data)
if remaining:
_pendingbuffer.append(remaining) # Buffer will keep growing if other side is slow and we have a lot of data
async def drain():
if len(_pendingbuffer) < BUF_LIMIT:
return
await wait_until_other_side_is_up_to_speed()
assert len(_pendingbuffer) < BUF_LIMIT
async def awrite(writer, data):
writer.write(data)
await writer.drain()
那么何时使用:
- 当数据不连续时,就像响应HTTP请求一样。我们只需要发送一些数据而不关心何时到达且内存不是问题 - 只需使用
write
- 与上面相同,但记忆是一个问题,使用
awrite
- 将数据流式传输到大量客户端时(例如某些直播流或巨大文件)。如果数据在每个连接的缓冲区中重复,它肯定会溢出RAM。在这种情况下,编写一个循环,每次迭代需要一大块数据并调用
awrite
。如果文件很大,loop.sendfile
会更好。
据我所知,(1)每次调用时都需要调用drain。 (2)如果不是,我猜,写将阻止循环线程
两者都不正确,但混乱是可以理解的。 write()
的工作方式如下:
- 对
write()
的调用只是将数据存入缓冲区,将其留在事件循环中,以便稍后将其写出,而无需程序的进一步干预。就应用而言,数据在后台写入的速度与对方能够接收的速度一样快。换句话说,每个write()
将使用尽可能多的OS级写入来调度其数据,并在相应的文件描述符实际可写时发出这些写入。所有这一切都自动发生,即使没有等待drain()
。 write()
不是一个协程,它绝对不会阻止事件循环。
第二个属性听起来很方便,但它实际上是write()
的一个主要缺陷。写入与接受数据分离,因此如果您写入数据的速度比同伴读取数据的速度快,内部缓冲区将继续增长,您手上就会有memory leak。一旦缓冲区变得太大,等待drain()
就会停止协同程序。每次写入后都不需要等待drain()
,但是你需要偶尔等待它,通常是在循环迭代之间。例如:
while True:
response = await peer1.readline()
peer2.write(b'<response>')
peer2.write(response)
peer2.write(b'</response>')
await peer2.drain()
如果待处理的未写入数据量很小,drain()
会立即返回。如果数据超过高阈值,drain()
将暂停调用协程,直到待处理的未写入数据量低于低阈值。暂停将导致协程停止从peer1
读取,这将导致对等体减慢它向我们发送数据的速率。这种反馈被称为背压。
Python 3.8将support awaiting write
directly,这将删除显式调用drain()
的需要。
缓冲应该在写入功能内部处理,应用程序不应该在意。
这就是write()
现在的工作方式 - 它确实处理缓冲,它让应用程序不关心,无论好坏。另请参阅this answer了解更多信息。
Addressing the edited part of the question:
再次阅读答案和链接,我认为这些功能是这样的。
write()
仍然比那更聪明。它不会尝试只写一次,它实际上会安排数据继续写入,直到没有数据写入。即使您从未等待drain()
,也会发生这种情况 - 应用程序必须做的唯一事情是让事件循环运行足够长的时间来写出所有内容。
更正确的write
和drain
伪代码可能如下所示:
class ToyWriter:
def __init__(self):
self._buf = bytearray()
self._empty = asyncio.Event(True)
def write(self, data):
self._buf.extend(data)
loop.add_writer(self._fd, self._do_write)
self._empty.clear()
def _do_write(self):
# Automatically invoked by the event loop when the
# file descriptor is writable, regardless of whether
# anyone calls drain()
while self._buf:
try:
nwritten = os.write(self._fd, self._buf)
except OSError as e:
if e.errno == errno.EWOULDBLOCK:
return # continue once we're writable again
raise
self._buf = self._buf[nwritten:]
self._empty.set()
loop.remove_writer(self._fd, self._do_write)
async def drain(self):
if len(self._buf) > 64*1024:
await self._empty.wait()
实际实现更复杂,因为:
- 它写在Twisted风格的transport/protocol层上,有自己精致的flow control,而不是在
os.write
之上; - 因为
drain()
并没有真正等到缓冲区为空,但直到达到low watermark为止; - 在
EWOULDBLOCK
筹集的_do_write
以外的例外情况在drain()
中存储并重新提升。
最后一点是调用drain()
的另一个好理由,实际上注意到对等方已经通过写入失败而消失了。
以上是关于为什么要显式调用asyncio.StreamWriter.drain?的主要内容,如果未能解决你的问题,请参考以下文章