如何在 Twisted 的 asyncioreactor 之上运行 asyncio 库代码?

Posted

技术标签:

【中文标题】如何在 Twisted 的 asyncioreactor 之上运行 asyncio 库代码?【英文标题】:How can I run asyncio library code on top of Twisted's asyncioreactor? 【发布时间】:2017-02-18 16:08:56 【问题描述】:

我已经成功导入/安装了 Twisted 的 asyncioreactor 并执行了一个简单的异步函数:

from twisted.internet import asyncioreactor
asyncioreactor.install()
from twisted.internet import task
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import ensureDeferred

async def sleepy(reactor):
    print("SLEEPING")
    await task.deferLater(reactor, 3.0, lambda: None)
    print("done sleep")
    return 42

@task.react
def main(reactor):
    d = ensureDeferred(sleepy(reactor))
    d.addCallback(print)
    return d

我想在上述代码中混合一个 asyncio 库,例如 asyncio.sleep。我尝试了以下方法:

from twisted.internet import asyncioreactor
asyncioreactor.install()
from twisted.internet import task
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import ensureDeferred

import asyncio

async def sleepy(reactor):
    print("SLEEPING")
    await asyncio.sleep(3)
    print("done sleep")
    return 42

@task.react
def main(reactor):
    d = ensureDeferred(sleepy(reactor))
    d.addCallback(print)
    return d

这会产生以下错误:

 $ python test.py
SLEEPING
main function encountered error
Traceback (most recent call last):
  File "test.py", line 16, in <module>
    @task.react
  File "/Users/blz/.pyenv/versions/3.6.0/lib/python3.6/site-packages/twisted/internet/task.py", line 908, in react
    finished = main(_reactor, *argv)
  File "test.py", line 18, in main
    d = ensureDeferred(sleepy(reactor))
  File "/Users/blz/.pyenv/versions/3.6.0/lib/python3.6/site-packages/twisted/internet/defer.py", line 823, in ensureDeferred
    return _inlineCallbacks(None, coro, Deferred())
--- <exception caught here> ---
  File "/Users/blz/.pyenv/versions/3.6.0/lib/python3.6/site-packages/twisted/internet/defer.py", line 1301, in _inlineCallbacks
    result = g.send(result)
  File "test.py", line 11, in sleepy
    await asyncio.sleep(3)
  File "/Users/blz/.pyenv/versions/3.6.0/lib/python3.6/asyncio/tasks.py", line 476, in sleep
    return (yield from future)
builtins.AssertionError: yield from wasn't used with future

很公平,我想,所以我尝试用await ensureDeferred(asyncio.sleep(3))await asyncio.ensure_future(asyncio.sleep(3)) 交换await asyncio.sleep(3),但我得到了完全相同的错误。

如何安排 aio 协程(和/或 Future)在 asyncioreactor 使用的同一事件循环上运行?

【问题讨论】:

这方面有进一步的更新吗? 【参考方案1】:

所以我尝试用 await 交换 await asyncio.sleep(3) ensureDeferred(asyncio.sleep(3)) 并等待 asyncio.ensure_future(asyncio.sleep(3))

你快到了,你应该把两者结合起来,用Deferred.fromFuture代替ensureDeferred

await Deferred.fromFuture(asyncio.ensure_future(asyncio.sleep(3)))

规则是,在 Twisted 上下文中运行的 async def 函数(带有 ensureDeferred)只能在 Deferred 上等待,而在 asyncio 上下文中运行的 async def 函数(带有 ensure_future)只能在 asyncio Future 上运行(总是可以在其他协程对象上等待(来自async def 函数调用的结果),但链最终会导致Deferred/Future)。要从 asyncio Future 转换为 Deferred 使用 Deferred.fromFuture 并转换为 asyncio Future 使用 Deferred.asFuture

人们可以将上下文从一个切换到另一个并返回。在这个(人为的)示例中,我们从 sleepy_twisted 开始运行 Twisted context,执行 Twisted sleep,然后切换到 asyncio context 运行 sleepy_asyncio,执行 asyncio sleep,然后再次切换到 Twisted context 执行 Twisted sleep:

from twisted.internet import asyncioreactor, task
from twisted.internet.defer import inlineCallbacks, ensureDeferred, Deferred
import asyncio
asyncioreactor.install()

async def sleepy_asyncio(reactor):
   print("Sleep 2")
   await asyncio.sleep(1)
   print("Sleep 3")
   await Deferred.asFuture(task.deferLater(reactor, 1, lambda: None), loop=asyncio.get_running_loop())
   
async def sleepy_twisted(reactor):
   print("Sleep 1")
   await task.deferLater(reactor, 1, lambda: None)
   await Deferred.fromFuture(asyncio.ensure_future(sleepy_asyncio(reactor)))
   print("done")

@task.react
def main(reactor):
    return ensureDeferred(sleepy_twisted(reactor))

【讨论】:

【参考方案2】:

哇,您遇到了一个有趣的角落案例! 通过使用 asyncio.sleep() 你触发了一些有趣的行为。

我想你可能已经发现了一个错误 Twisted 与 Python 3 asyncioreactor 和 async/await 的集成。 您可能希望在 Twisted 邮件列表中跟进 Twisted 开发人员。

我不是 100% 确定,但这是我的想法。

asyncio.sleep() 的实现与 Python 3 紧密耦合 异步实现。它使用 asyncio.Future(类似于 Twisted 的 deferred),它使用 get_event_loop()(类似于 Twisted 的 reactor)。

asyncio.sleep 是这样实现的:

@coroutine
def sleep(delay, result=None, *, loop=None):
    """Coroutine that completes after a given time (in seconds)."""
    if delay == 0:
        yield
        return result

    if loop is None:
        loop = events.get_event_loop()
    future = loop.create_future()
    h = future._loop.call_later(delay,
                                futures._set_result_unless_cancelled,
                                future, result)
    try:
        return (yield from future)
    finally:
        h.cancel()

我稍微更改了您的代码示例以通过 Twisted 的 asyncioreactor 事件循环进入 asyncio.sleep():

from twisted.internet import asyncioreactor
asyncioreactor.install()
from twisted.internet import reactor
from twisted.internet import task
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import ensureDeferred

import asyncio

async def sleepy(reactor):
    print("SLEEPING")
    await asyncio.sleep(3, loop=reactor._asyncioEventloop)
    print("done sleep")
    return 42

@task.react
def main(reactor):
    d = ensureDeferred(sleepy(reactor))
    d.addCallback(print)
    return d

我仍然遇到和你一样的错误:builtins.AssertionError: yield from was not used with future

堆栈跟踪如下所示:

main function encountered error
Traceback (most recent call last):
  File "b.py", line 16, in <module>
    @task.react
  File "/Users/crodrigues/twisted8/src/twisted/internet/task.py", line 908, in react
    finished = main(_reactor, *argv)
  File "b.py", line 19, in main
    d = ensureDeferred(sleepy(reactor))
  File "/Users/crodrigues/twisted8/src/twisted/internet/defer.py", line 823, in ensureDeferred
    return _inlineCallbacks(None, coro, Deferred())
--- <exception caught here> ---
  File "/Users/crodrigues/twisted8/src/twisted/internet/defer.py", line 1301, in _inlineCallbacks
    result = g.send(result)
  File "b.py", line 12, in sleepy
    await asyncio.sleep(3, loop=reactor._asyncioEventloop)
File "/usr/local/Cellar/python3/3.6.0/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py", line 478, in sleep
    return (yield from future)
builtins.AssertionError: yield from wasn't used with future

我认为 asyncio.sleep() 是一个应该运行完成的协程 在 asyncio 循环上,但这里没有发生,因此断言。

我认为问题是由 result = g.send(result) 引起的。 我不确定您是否可以send() 到这样的协程并期望它能够正常工作。

我建议您在 Twisted 邮件列表中询问以获得更详细的反馈。

【讨论】:

以上是关于如何在 Twisted 的 asyncioreactor 之上运行 asyncio 库代码?的主要内容,如果未能解决你的问题,请参考以下文章

实现发布-订阅模式时如何在 Django 和 Twisted 之间进行通信?

您如何通过 Python(而不是通过 Twisted)运行 Twisted 应用程序?

您如何通过Python(而不是通过Twisted)运行Twisted应用程序?

如何将两个整数与 Twisted 相加?

如何在Buildozer中构建kivy和Twisted

Twisted - 如何创建多协议进程并在协议之间发送数据