在asyncio.Protocol.data_received中调用协同程序

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在asyncio.Protocol.data_received中调用协同程序相关的知识,希望对你有一定的参考价值。

我在新的Python asyncio模块的asyncio.Protocol.data_received回调中执行异步操作时遇到问题。

考虑以下服务器:

class MathServer(asyncio.Protocol):

   @asyncio.coroutine
   def slow_sqrt(self, x):
      yield from asyncio.sleep(1)
      return math.sqrt(x)

   def fast_sqrt(self, x):
      return math.sqrt(x)

   def connection_made(self, transport):
      self.transport = transport

   #@asyncio.coroutine
   def data_received(self, data):
      print('data received: {}'.format(data.decode()))
      x = json.loads(data.decode())
      #res = self.fast_sqrt(x)
      res = yield from self.slow_sqrt(x)
      self.transport.write(json.dumps(res).encode('utf8'))
      self.transport.close()

与以下客户一起使用:

class MathClient(asyncio.Protocol):

   def connection_made(self, transport):
      transport.write(json.dumps(2.).encode('utf8'))

   def data_received(self, data):
      print('data received: {}'.format(data.decode()))

   def connection_lost(self, exc):
      asyncio.get_event_loop().stop()

随着self.fast_sqrt被调用,一切都按预期工作。

使用self.slow_sqrt,它不起作用。

它也不适用于self.fast_sqrt@asyncio.coroutine上的data_received装饰。

我觉得我在这里缺少一些基本的东西。

完整的代码在这里:

经测试:

  • Python 3.4.0b1(Windows)
  • Python 3.3.3 + asyncio-0.2.1(FreeBSD)

两个问题都是一样的:使用slow_sqrt,客户端/服务器只会挂起什么都不做。

答案

看来,这需要通过Future解耦 - 尽管我仍然不确定这是否是正确的方法。

class MathServer(asyncio.Protocol):

   @asyncio.coroutine
   def slow_sqrt(self, x):
      yield from asyncio.sleep(2)
      return math.sqrt(x)

   def fast_sqrt(self, x):
      return math.sqrt(x)

   def consume(self):
      while True:
         self.waiter = asyncio.Future()
         yield from self.waiter
         while len(self.receive_queue):
            data = self.receive_queue.popleft()
            if self.transport:
               try:
                  res = self.process(data)
                  if isinstance(res, asyncio.Future) or 
                     inspect.isgenerator(res):
                     res = yield from res
               except Exception as e:
                  print(e)

   def connection_made(self, transport):
      self.transport = transport
      self.receive_queue = deque()
      asyncio.Task(self.consume())

   def data_received(self, data):
      self.receive_queue.append(data)
      if not self.waiter.done():
         self.waiter.set_result(None)
      print("data_received {} {}".format(len(data), len(self.receive_queue)))

   def process(self, data):
      x = json.loads(data.decode())
      #res = self.fast_sqrt(x)
      res = yield from self.slow_sqrt(x)
      self.transport.write(json.dumps(res).encode('utf8'))
      #self.transport.close()

   def connection_lost(self, exc):
      self.transport = None

这是Guido van Rossum的answer

解决方案很简单:将该逻辑写为使用@coroutine标记的单独方法,并使用data_received()(在本例中为async())在== Task()中将其关闭。之所以没有内置到协议中的原因是,如果是,它将需要备用事件循环实现来处理协同程序。

def data_received(self, data):
    asyncio.ensure_future(self.process_data(data))

@asyncio.coroutine
def process_data(self, data):
    # ...stuff using yield from...

完整代码在这里: - Client - Server

另一答案

当我的MyProtocol.connection_made被调用时,我有一个类似的问题,我想运行一个协同程序。我的解决方案非常相似,只是我的协议可以访问循环。对于那些使用更新版本的python的人来说,下面的代码对我有用(我使用的是python 3.6.8):

class MyProtocol(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop

    async def do_async_thing(self):
        await asyncio.sleep(1)

    def connection_made(self, transport):
        self.transport = transport
        self.loop.create_task(self.do_async_thing())

    # Other member functions left out for brevity.

这是有道理的 - 循环需要调度需要具有独立上下文的任务,即可以独立于任何其他调用堆栈运行。这就是为什么你给循环一个它可以运行的协程,do_async_thing()以及在这种情况下的类实例,它会在它可以调用时调用。当它被调用时,它与connection_made成员函数无关。

值得注意的是,这也可以通过使用asyncio.ensure_future(coro, loop=None)而不是self.loop.create_task(coro)来实现,但后者可能会使用默认循环。事实上,它确实 - 我刚检查了source code

以上是关于在asyncio.Protocol.data_received中调用协同程序的主要内容,如果未能解决你的问题,请参考以下文章

秋的潇洒在啥?在啥在啥?

上传的数据在云端的怎么查看,保存在啥位置?

在 React 应用程序中在哪里转换数据 - 在 Express 中还是在前端使用 React?

存储在 plist 中的数据在模拟器中有效,但在设备中无效

如何在保存在 Mongoose (ExpressJS) 之前在模型中格式化数据

如何在保存在 Mongoose (ExpressJS) 之前在模型中格式化数据