异步延迟 StreamReader.read

Posted

技术标签:

【中文标题】异步延迟 StreamReader.read【英文标题】:asyncio delayed StreamReader.read 【发布时间】:2016-02-02 16:39:51 【问题描述】:

我对 asyncio 和 async def/await 语法有点陌生,所以我想问一下我应该如何做这样的事情:

import asyncio
import pygame
import logging
from pygame import *
log = logging.getLogger('')


class Client:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.loop = asyncio.get_event_loop()
        self.loop.run_until_complete(self.create_client())

    async def create_client(self):
        self.reader, self.writer = await asyncio.open_connection(self.host, 
                                                                 self.port, 
                                                                 loop=self.loop)
        asyncio.ensure_future(self._handle_packets(), loop=self.loop)

    async def _handle_packets(self):
        while True:
            data = await self.reader.read(4096)
            if not data:
                continue
            message = data.decode()
            log.debug("(NET) Received "+message)

    def send(self, data):
        self.loop.run_until_complete(asyncio.ensure_future(self._send(data),
                                                           loop=self.loop))

    async def _send(self, data):
        self.writer.write(data)
        await self.writer.drain()
        print("_send done")

    def disconnect(self):
        print("DC")
        self.loop.close()


def main():
    pygame.init()
    screen = pygame.display.set_mode((640, 480))
    pygame.display.set_caption("Pyond client")
    bg = Surface((640, 480))
    bg.fill(Color("#004400"))
    client = Client('127.0.0.1', 2508)
    while True:
        pygame.event.pump()
        for e in pygame.event.get():
            if e.type == QUIT:
                raise SystemExit
            elif e.type == KEYUP:
                if e.key == K_UP:
                    client.send(b"'Hello':'World'")
        screen.blit(bg, (0, 0))
        pygame.display.update()
    client.disconnect()


if __name__ == "__main__":
    main()

此代码使用 pygame 创建 640x480 窗口,然后读取传入的 K_UP(向上箭头)键。按下后,将类似 json 的字符串发送到服务器。 _handle_packets 应该从服务器读取任何传入数据并打印出来。 我正在测试此代码并且发送工作正常,但接收延迟很大。我确定我需要将处理程序放在其他地方,那么到底在哪里? 顺便说一句,发送只工作一次。在这方面也需要帮助。

【问题讨论】:

【参考方案1】:

这里有几个问题。

第一个非常基本asycnio 事件循环在 create_client() 完成后停止运行,并且只会再次运行 send() 数据。所以,它唯一能够运行_handle_packets 的时间是你send()ing 的时候。理想情况下,您应该在更高的范围内启动一次事件循环,并在完成所有操作后关闭它。

第二个问题是,每当您client.send(b"'Hello':'World'") 时,您将阻塞外部pygame while True 循环,阻止任何其他事件被处理,直到前一个事件被发送。您应该使用asyncio.Queue 对事件进行排队并从Client 类发送它们。

以下是我要做的一些更改(抱歉,未经测试;我没有安装pygame ATM):

# vim: tabstop=4 expandtab

import asyncio
import pygame
import logging
from pygame import *
log = logging.getLogger('')


class Client:
    def __init__(self, host, port, loop):
        self.host = host
        self.port = port
        self.loop = loop
        self.send_q = asyncio.Queue()

    async def connect(self):
        self.reader, self.writer = await asyncio.open_connection(self.host, 
                                                                 self.port, 
                                                                 loop=self.loop)
        self.loop.create_task(self._handle_packets())
        self.loop.create_task(self._send())

    async def _handle_packets(self):
        while True:
            data = await self.reader.read(4096)
            if not data:
                continue
            message = data.decode()
            log.debug("(NET) Received "+message)

    def send(self, data):
        self.send_q.put_nowait(data)

    async def _send(self):
        while True:
            data = await self.send_q.get()
            self.writer.write(data)
            await self.writer.drain()

    def disconnect(self):
        print("DC")
        self.writer.close()


async def main(loop):
    pygame.init()
    screen = pygame.display.set_mode((640, 480))
    pygame.display.set_caption("Pyond client")
    bg = Surface((640, 480))
    bg.fill(Color("#004400"))
    client = Client('127.0.0.1', 2508, loop)
    await client.connect()
    while True:
        pygame.event.pump()
        for e in pygame.event.get():
            if e.type == QUIT:
                raise SystemExit
            elif e.type == KEYUP:
                if e.key == K_UP:
                    client.send(b"'Hello':'World'")
        screen.blit(bg, (0, 0))
        pygame.display.update()
    client.disconnect()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

另一个重要要记住的是,你永远不应该用pygame 阻塞asyncio 事件循环,否则后台的Client 网络处理将停止。我从未使用过pygame,所以我不熟悉哪个pygame 函数可能是“阻塞”的,但应该使用result = await loop.run_in_executor(None, blocking_func, *func_args) 调用它们。这将在另一个线程中调用阻塞函数。

【讨论】:

以上是关于异步延迟 StreamReader.read的主要内容,如果未能解决你的问题,请参考以下文章

异步加载和延迟加载

延迟显示异步获取的项目

“异步延迟”一起使用时有啥作用? [复制]

异步延迟 StreamReader.read

你遇到过哪些原因造成MySQL异步复制延迟?

常用异步延迟方法