为啥要显式调用 asyncio.StreamWriter.drain?

Posted

技术标签:

【中文标题】为啥要显式调用 asyncio.StreamWriter.drain?【英文标题】:Why should asyncio.StreamWriter.drain be explicitly called?为什么要显式调用 asyncio.StreamWriter.drain? 【发布时间】:2019-05-15 17:43:03 【问题描述】:

来自doc:

写入(数据)

Write data to the stream.

This method is not subject to flow control. Calls to write() should be followed by drain().

协程 drain()

Wait until it is appropriate to resume writing to the stream. Example:

writer.write(data)
await writer.drain()

据我了解,

每次调用write时,您都需要调用drain。 如果不是我猜,write 会阻塞循环线程

那为什么 write 不是自动调用的协程呢?为什么一个人打电话write 而不用耗尽?我能想到两种情况

    你想立即writeclose 您必须在消息完成之前缓冲一些数据。

第一个是特殊情况,我认为我们可以有不同的 API。缓冲应该在写入函数内部处理,应用程序不应该关心。


让我换个方式提出问题。这样做的缺点是什么? python3.8版本能有效做到这一点吗?

async def awrite(writer, data):
    writer.write(data)
    await writer.drain()

注意:drain doc 明确指出以下内容:

当没有等待时,drain() 立即返回。


再次阅读答案和链接,我认为这些功能是这样工作的。 注意:检查接受的答案以获得更准确的版本。

def write(data):
    remaining = socket.try_write(data)
    if remaining:
        _pendingbuffer.append(remaining) # Buffer will keep growing if other side is slow and we have a lot of data

async def drain():
    if len(_pendingbuffer) < BUF_LIMIT:
        return
    await wait_until_other_side_is_up_to_speed()
    assert len(_pendingbuffer) < BUF_LIMIT

async def awrite(writer, data):
    writer.write(data)
    await writer.drain()        

那么什么时候用什么:

    当数据不连续时,就像响应一个HTTP请求一样。我们只需要发送一些数据,而不关心何时到达并且内存不是问题 - 只需使用write 同上,但内存是个问题,使用awrite 将数据流式传输到大量客户端时(例如,一些实时流或大文件)。如果数据在每个连接的缓冲区中重复,它肯定会溢出 RAM。在这种情况下,编写一个循环,每次迭代都会获取一大块数据并调用awrite。如果文件很大,loop.sendfile 会更好。

【问题讨论】:

trio async 库的作者写了一篇有趣的文章,涵盖了这一点:vorpus.org/blog/…...搜索drain。如果您认为相关,我可以在此处发布其中的一部分作为答案 【参考方案1】:

据我了解,(1)每次调用 write 时都需要调用 drain。 (2) 如果不是我猜,write 会阻塞循环线程

两者都不正确,但混淆是可以理解的。 write() 的工作方式如下:

write() 的调用只是将数据存储到缓冲区中,然后将其留给事件循环以便稍后实际将其写出,而无需程序进一步干预。就应用程序而言,数据在后台写入的速度与对方能够接收到的速度一样快。换句话说,每个write() 将调度其数据以使用尽可能多的操作系统级写入来传输,这些写入在相应的文件描述符实际可写时发出。这一切都是自动发生的,甚至无需等待drain()

write() 不是协程,它绝对从不阻塞事件循环。

第二个属性听起来很方便 - 您可以在任何需要的地方调用 write(),甚至可以使用不是 async def 的函数 - 但它实际上是 write() 的主要缺陷。流 API 公开的写入与接受数据的操作系统完全分离,因此,如果您写入数据的速度快于网络对等方读取数据的速度,则内部缓冲区将不断增长,您将拥有memory leak。 drain() 解决了这个问题:如果写入缓冲区变得太大,等待它会暂停协程,并在后台执行 os.write() 成功并且缓冲区缩小后再次恢复。

您不需要在每次写入之后等待drain(),但您确实需要偶尔等待它,通常是在调用write() 的循环的迭代之间。例如:

while True:
    response = await peer1.readline()
    peer2.write(b'<response>')
    peer2.write(response)
    peer2.write(b'</response>')
    await peer2.drain()

drain() 如果待处理的未写入数据量很小,则立即返回。如果数据超过高阈值,drain() 将暂停调用协程,直到待处理的未写入数据量降至低阈值以下。暂停将导致协程停止从peer1 读取数据,这反过来又会导致对等方减慢它向我们发送数据的速率。这种反馈称为背压。

缓冲应该在写入函数内部处理,应用程序不应该关心。

这就是write() 现在的工作方式——它确实处理缓冲,它让应用程序不管好坏。另请参阅this answer 了解更多信息。


解决问题的已编辑部分:

再次阅读答案和链接,我认为这些功能是这样工作的。

write() 仍然比这更聪明一些。它不会只尝试写入一次,它实际上会安排数据继续写入,直到没有数据可以写入。即使您从不等待drain(),这也会发生 - 应用程序必须做的唯一事情就是让事件循环运行足够长的时间以写出所有内容。

writedrain 的更正确伪代码可能如下所示:

class ToyWriter:
    def __init__(self):
        self._buf = bytearray()
        self._empty = asyncio.Event(True)

    def write(self, data):
        self._buf.extend(data)
        loop.add_writer(self._fd, self._do_write)
        self._empty.clear()

    def _do_write(self):
        # Automatically invoked by the event loop when the
        # file descriptor is writable, regardless of whether
        # anyone calls drain()
        while self._buf:
            try:
                nwritten = os.write(self._fd, self._buf)
            except OSError as e:
                if e.errno == errno.EWOULDBLOCK:
                    return  # continue once we're writable again
                raise
            self._buf = self._buf[nwritten:]
        self._empty.set()
        loop.remove_writer(self._fd, self._do_write)

    async def drain(self):
        if len(self._buf) > 64*1024:
            await self._empty.wait()

实际的实现更复杂,因为:

它写在 Twisted 样式的 transport/protocol 层之上,并带有自己复杂的 flow control,而不是在 os.write 之上; drain() 并没有真正等到缓冲区为空,而是等到它到达 low watermark; _do_write 中引发的 EWOULDBLOCK 以外的异常将存储并在 drain() 中重新引发。

最后一点是另一个很好的理由来调用drain() - 实际上注意到对等体已经因为写入失败而消失了。

【讨论】:

谢谢。 “你应该偶尔等待 drain()”。我明白你的意思了。但实际上会使代码混乱。我也更新了问题。 @balki 这里没有争论,write() 从一开始就应该是一个协程。当前的设计在 Twisted 中是有意义的,它是在协程可用之前设计的,今天它既令人困惑又是一个 bug 吸引器。我已更新答案以解决已编辑的问题。 您写了“Python 3.8 将支持直接等待写入”并链接到 3.8 文档,但它没有提到能够等待 write()。尽管如此,这种说法仍然正确吗? @PeterHansen 在线文档曾经引用了一个等待的write(),但似乎已经改变了!确实,检查 git 日志显示在 3.8 开发周期中更改为 reverted。新功能是 asyncio 流的大规模重新实现的一部分,已被推迟,请参阅this issue 了解整个讨论。我现在已经编辑了答案以删除过时的句子

以上是关于为啥要显式调用 asyncio.StreamWriter.drain?的主要内容,如果未能解决你的问题,请参考以下文章

为啥要显式抛出 NullPointerException 而不是让它自然发生?

为什么要显式调用asyncio.StreamWriter.drain?

为啥我们在使用 HTTP 协议时需要指定端口号?

为啥我们需要显式调用 zero_grad()? [复制]

为啥我必须显式调用我的库?

为啥代码会通过空指针显式调用静态方法?