如何在 cdef 中等待?

Posted

技术标签:

【中文标题】如何在 cdef 中等待?【英文标题】:How to await in cdef? 【发布时间】:2018-02-26 12:56:32 【问题描述】:

我有这个 Cython 代码(简化):

class Callback:
    async def foo(self):
        print('called')

cdef void call_foo(void* callback):
    print('call_foo')
    asyncio.wait_for(<object>callback.foo())

async def py_call_foo():
    call_foo(Callback())

async def example():
    loop.run_until_complete(py_call_foo())

但是会发生什么:我得到RuntimeWarning: coroutine Callback.foo was never awaited。而且,事实上,它从来没有被调用过。但是,call_foo 被调用。

知道发生了什么/如何让它真正等待Callback.foo 完成?


加长版

在上面的例子中,一些重要的细节被遗漏了:特别是,从call_foo 获取返回值真的很困难。真正的项目设置是这样的:

    具有规则的 Bison 解析器。规则是对特制结构的引用,我们称之为ParserState。此结构包含对回调的引用,当规则匹配时由解析器调用。

    在 Cython 代码中,有一个类,我们称之为 Parser,包的用户应该扩展它来制作他们的自定义解析器。此类具有然后需要从 ParserState 的回调中调用的方法。

    解析应该是这样发生的:

    async def parse_file(file, parser):
        cdef ParserState state = allocate_parser_state(
            rule_callbacks,
            parser,
            file,
        )
        parse_with_bison(state)
    

回调具有一般形式:

ctypedef void(callback*)(char* text, void* parser)

我不得不承认我不知道asyncio 究竟是如何实现await 的,所以我不知道一般情况下是否可以使用我拥有的设置来做到这一点。不过,我的最终目标是多个 Python 函数能够或多或少地同时迭代解析不同的文件。

【问题讨论】:

我认为你应该在py_call_fooawait call_foo(...)。此外,您应该在call_fooreturn asyncio.wait_for(...)。否则,事件循环在 Callback.foo 运行完成之前退出,并且 asyncio 会报错。 @user4815162342 问题是,实际上,call_foo 是用 C 代码调用的,我没有办法从该调用中获取返回值(更具体地说,它是由代码调用的由Bison生成)。 暂停不是基于一些解释器技巧的,完全有可能将其编译成高效的机器代码(这是 Cython 和 PyPy 使用异步函数所做的)。正是因为它不使用技巧,所以它不能神奇地将同步代码转换为异步代码。如果代码是非阻塞的并且基于回调,则可以这样做——想想 javascript 风格的“承诺”等。您使用的解析器是否支持“推送”接口? 使用push接口解析应该明显慢一点并不明显;例如,以效率着称的 expat XML 解析器支持推送接口。无论哪种方式,如果您的解析器希望“控制”解析过程(即在解析完成之前不返回调用者),您将需要使用线程或其仿真(例如 greenlets)来同时运行多个实例。 asyncio 旨在通过提供暂停原语使其看起来像是在命令式编程,从而使使用回调进行编程变得更容易。 这是一项令人印象深刻的工作,尽管它的代价是它使解析器从根本上与 asyncio(以及类似形式的协作多任务处理)不兼容。不过,我想知道,与函数完成的实际解析工作相比,one 堆分配(对于所有令牌)的成本是否真的有所不同? 【参考方案1】:

TLDR:

协程必须是await'ed 或由事件循环运行。 cdef 函数不能await,但它可以构造和返回协程。

您的实际问题是将同步代码与异步代码混合使用。举个例子:

async def example():
    loop.run_until_complete(py_call_foo())

这类似于将子例程放入线程中,但从不启动它。 即使在启动时,这也是一个死锁:同步部分会阻止异步部分运行。


异步代码必须是awaited

async def 协程类似于def ...: yield 生成器:调用它只会实例化它。您必须与其交互才能实际运行它:

def foo():
     print('running!')
     yield 1

bar = foo()  # no output!
print(next(bar))  # prints `running!` followed by `1`

同样,当您有一个 async def 协程时,您必须要么 await 它要么将其安排在事件循环中。由于asyncio.wait_for 产生一个协程,而您从未await 或调度它,它不会运行。这就是RuntimeWarning 的原因。

请注意,将协程放入asyncio.wait_for 的目的纯粹是为了添加超时。它产生一个异步包装器,它必须是await'ed。

async def call_foo(callback):
    print('call_foo')
    await asyncio.wait_for(callback.foo(), timeout=2)

asyncio.get_event_loop().run_until_complete(call_foo(Callback()))

异步函数需要异步指令

异步编程的关键在于它是合作的:只有一个协程执行直到它产生控制权。之后,另一个协程执行直到它产生控制。这意味着任何阻塞而不产生控制的协程也会阻塞所有其他协程。

一般来说,如果某些东西在没有await 上下文的情况下执行工作,它就是阻塞的。值得注意的是,loop.run_until_complete 正在阻塞。您必须从同步函数中调用它:

loop = asyncio.get_event_loop()

# async def function uses await
async def py_call_foo():
    await call_foo(Callback())

# non-await function is not async
def example():
    loop.run_until_complete(py_call_foo())

example()

协程返回值

协程可以return 像常规函数一样产生结果。

async def make_result():
    await asyncio.sleep(0)
    return 1

如果你从另一个协程await它,你直接得到返回值:

async def print_result():
    result = await make_result()
    print(result)  # prints 1

asyncio.get_event_loop().run_until_complete(print_result())

要从常规子程序中的协程获取值,请使用run_until_complete 运行协程:

def print_result():
    result = asyncio.get_event_loop().run_until_complete(make_result())
    print(result)

print_result()

cdef/cpdef 函数不能是协程

Cython 通过 yield fromawait 仅支持 Python 函数的协程。即使对于经典协程,cdef 也是不可能的:

Error compiling Cython file:
------------------------------------------------------------
cdef call_foo(callback):
    print('call_foo')
    yield from asyncio.wait_for(callback.foo(), timeout=2)
   ^
------------------------------------------------------------

testbed.pyx:10:4: 'yield from' not supported here

您完全可以调用协程中的同步cdef 函数。您可以很好地调度来自cdef 函数的协程。 但是你不能从 cdef 函数内部 await ,也不能从 await cdef 函数内部。如果您需要这样做,如您的示例中所示,请使用常规 def 函数。

但是,您可以在 cdef 函数中构造并返回协程。这允许您在外部协程中await 结果:

# inner coroutine
async def pingpong(what):
    print('pingpong', what)
    await asyncio.sleep(0)
    return what

# cdef layer to instantiate and return coroutine
cdef make_pingpong():
    print('make_pingpong')
    return pingpong('nananana')

# outer coroutine
async def play():
    for i in range(3):
        result = await make_pingpong()
        print(i, '=>', result)

asyncio.get_event_loop().run_until_complete(play())

请注意,尽管awaitmake_pingpong 不是协程。它只是协程的工厂。

【讨论】:

以上是关于如何在 cdef 中等待?的主要内容,如果未能解决你的问题,请参考以下文章

如何分析 cdef 函数?

在cdef类中混合使用cdef和常规python属性

windows8.1如何分盘

访问 cffi 枚举

(strcmp)两个字符串长度不同时如何比较?

有Thread1Thread2Thread3Thread4四条线程分别统计CDEF四个盘的大小,所有线程都统计完毕交给Thread5线程去做汇总,应当如何实现?