如何在 cdef 中等待?
Posted
技术标签:
【中文标题】如何在 cdef 中等待?【英文标题】:How to await in cdef? 【发布时间】:2018-02-26 12:56:32 【问题描述】:我有这个 Cython 代码(简化):
class Callback:
async def foo(self):
print('called')
cdef void call_foo(void* callback):
print('call_foo')
asyncio.wait_for(<object>callback.foo())
async def py_call_foo():
call_foo(Callback())
async def example():
loop.run_until_complete(py_call_foo())
但是会发生什么:我得到RuntimeWarning: coroutine Callback.foo was never awaited
。而且,事实上,它从来没有被调用过。但是,call_foo
被调用。
知道发生了什么/如何让它真正等待Callback.foo
完成?
加长版
在上面的例子中,一些重要的细节被遗漏了:特别是,从call_foo
获取返回值真的很困难。真正的项目设置是这样的:
具有规则的 Bison 解析器。规则是对特制结构的引用,我们称之为ParserState
。此结构包含对回调的引用,当规则匹配时由解析器调用。
在 Cython 代码中,有一个类,我们称之为 Parser
,包的用户应该扩展它来制作他们的自定义解析器。此类具有然后需要从 ParserState
的回调中调用的方法。
解析应该是这样发生的:
async def parse_file(file, parser):
cdef ParserState state = allocate_parser_state(
rule_callbacks,
parser,
file,
)
parse_with_bison(state)
回调具有一般形式:
ctypedef void(callback*)(char* text, void* parser)
我不得不承认我不知道asyncio
究竟是如何实现await
的,所以我不知道一般情况下是否可以使用我拥有的设置来做到这一点。不过,我的最终目标是多个 Python 函数能够或多或少地同时迭代解析不同的文件。
【问题讨论】:
我认为你应该在py_call_foo
中await call_foo(...)
。此外,您应该在call_foo
中return asyncio.wait_for(...)
。否则,事件循环在 Callback.foo
运行完成之前退出,并且 asyncio 会报错。
@user4815162342 问题是,实际上,call_foo
是用 C 代码调用的,我没有办法从该调用中获取返回值(更具体地说,它是由代码调用的由Bison
生成)。
暂停不是基于一些解释器技巧的,完全有可能将其编译成高效的机器代码(这是 Cython 和 PyPy 使用异步函数所做的)。正是因为它不使用技巧,所以它不能神奇地将同步代码转换为异步代码。如果代码是非阻塞的并且基于回调,则可以这样做——想想 javascript 风格的“承诺”等。您使用的解析器是否支持“推送”接口?
使用push接口解析应该明显慢一点并不明显;例如,以效率着称的 expat XML 解析器支持推送接口。无论哪种方式,如果您的解析器希望“控制”解析过程(即在解析完成之前不返回调用者),您将需要使用线程或其仿真(例如 greenlets)来同时运行多个实例。 asyncio 旨在通过提供暂停原语使其看起来像是在命令式编程,从而使使用回调进行编程变得更容易。
这是一项令人印象深刻的工作,尽管它的代价是它使解析器从根本上与 asyncio(以及类似形式的协作多任务处理)不兼容。不过,我想知道,与函数完成的实际解析工作相比,one 堆分配(对于所有令牌)的成本是否真的有所不同?
【参考方案1】:
TLDR:
协程必须是await
'ed 或由事件循环运行。 cdef
函数不能await
,但它可以构造和返回协程。
您的实际问题是将同步代码与异步代码混合使用。举个例子:
async def example():
loop.run_until_complete(py_call_foo())
这类似于将子例程放入线程中,但从不启动它。 即使在启动时,这也是一个死锁:同步部分会阻止异步部分运行。
异步代码必须是await
ed
async def
协程类似于def ...: yield
生成器:调用它只会实例化它。您必须与其交互才能实际运行它:
def foo():
print('running!')
yield 1
bar = foo() # no output!
print(next(bar)) # prints `running!` followed by `1`
同样,当您有一个 async def
协程时,您必须要么 await
它要么将其安排在事件循环中。由于asyncio.wait_for
产生一个协程,而您从未await
或调度它,它不会运行。这就是RuntimeWarning
的原因。
请注意,将协程放入asyncio.wait_for
的目的纯粹是为了添加超时。它产生一个异步包装器,它必须是await
'ed。
async def call_foo(callback):
print('call_foo')
await asyncio.wait_for(callback.foo(), timeout=2)
asyncio.get_event_loop().run_until_complete(call_foo(Callback()))
异步函数需要异步指令
异步编程的关键在于它是合作的:只有一个协程执行直到它产生控制权。之后,另一个协程执行直到它产生控制。这意味着任何阻塞而不产生控制的协程也会阻塞所有其他协程。
一般来说,如果某些东西在没有await
上下文的情况下执行工作,它就是阻塞的。值得注意的是,loop.run_until_complete
正在阻塞。您必须从同步函数中调用它:
loop = asyncio.get_event_loop()
# async def function uses await
async def py_call_foo():
await call_foo(Callback())
# non-await function is not async
def example():
loop.run_until_complete(py_call_foo())
example()
协程返回值
协程可以return
像常规函数一样产生结果。
async def make_result():
await asyncio.sleep(0)
return 1
如果你从另一个协程await
它,你直接得到返回值:
async def print_result():
result = await make_result()
print(result) # prints 1
asyncio.get_event_loop().run_until_complete(print_result())
要从常规子程序中的协程获取值,请使用run_until_complete
运行协程:
def print_result():
result = asyncio.get_event_loop().run_until_complete(make_result())
print(result)
print_result()
cdef/cpdef
函数不能是协程
Cython 通过 yield from
和 await
仅支持 Python 函数的协程。即使对于经典协程,cdef
也是不可能的:
Error compiling Cython file:
------------------------------------------------------------
cdef call_foo(callback):
print('call_foo')
yield from asyncio.wait_for(callback.foo(), timeout=2)
^
------------------------------------------------------------
testbed.pyx:10:4: 'yield from' not supported here
您完全可以调用协程中的同步cdef
函数。您可以很好地调度来自cdef
函数的协程。
但是你不能从 cdef
函数内部 await
,也不能从 await
cdef
函数内部。如果您需要这样做,如您的示例中所示,请使用常规 def
函数。
但是,您可以在 cdef
函数中构造并返回协程。这允许您在外部协程中await
结果:
# inner coroutine
async def pingpong(what):
print('pingpong', what)
await asyncio.sleep(0)
return what
# cdef layer to instantiate and return coroutine
cdef make_pingpong():
print('make_pingpong')
return pingpong('nananana')
# outer coroutine
async def play():
for i in range(3):
result = await make_pingpong()
print(i, '=>', result)
asyncio.get_event_loop().run_until_complete(play())
请注意,尽管await
,make_pingpong
不是协程。它只是协程的工厂。
【讨论】:
以上是关于如何在 cdef 中等待?的主要内容,如果未能解决你的问题,请参考以下文章
有Thread1Thread2Thread3Thread4四条线程分别统计CDEF四个盘的大小,所有线程都统计完毕交给Thread5线程去做汇总,应当如何实现?