如何从同步代码 Python 中调用异步函数
Posted
技术标签:
【中文标题】如何从同步代码 Python 中调用异步函数【英文标题】:How to call a async function from a synchronized code Python 【发布时间】:2019-01-16 15:16:06 【问题描述】:所以我被锁定在我的桌面应用程序之后的 python 3.6.2 解释器。
我想要的是从同步方法或函数中调用异步函数。
当从桌面应用程序调用 python 函数时,它必须是一个不能等待的普通函数。
我可以从桌面应用程序发送一个 url 列表,我想要的是在异步事务中从每个 url 发回响应。
这是我的尝试,我标记了我不知道如何绕过的 SyntaxError。
import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()
timeout = 10
class FeatureProcessor(object):
def __init__(self):
pass
def input(self, feature):
urls_and_coords = zip(feature.getAttribute('_list._wms'),\
feature.getAttribute('_list._xmin'),\
feature.getAttribute('_list._ymin'),\
feature.getAttribute('_list._xmax'),\
feature.getAttribute('_list._ymax'))
-> SyntaxError: newfeature = await main(urls_and_coords)
self.pyoutput(newfeature)
def close(self):
pass
async def main(urls):
loop = asyncio.get_event_loop()
async with aiohttp.ClientSession(loop=loop) as session:
feature = loop.run_until_complete(fetch_all(session, urls, loop))
return feature
async def fetch_all(session, urls, loop):
results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
return results
async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url[0]) as response:
newFeature = fmeobjects.FMEFeature()
response_data = await response
newFeature.setAttribute('response', response_data)
newFeature.setAttribute('_xmin',url[1])
newFeature.setAttribute('_xmax',url[2])
newFeature.setAttribute('_ymin',url[3])
newFeature.setAttribute('_ymax',url[4])
return newFeature
我已尝试进行这些更改:
import fme
import fmeobjects
import asyncio
import aiohttp
import async_timeout
logger = fmeobjects.FMELogFile()
class FeatureProcessor(object):
def __init__(self):
pass
def input(self, feature):
urls_and_coords = zip(feature.getAttribute('_list._wms'),\
feature.getAttribute('_list._xmin'),\
feature.getAttribute('_list._ymin'),\
feature.getAttribute('_list._xmax'),\
feature.getAttribute('_list._ymax'))
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main(loop, urls_and_coords))
#feature.setAttribute('result',result)
self.pyoutput(feature)
def close(self):
pass
async def main(loop, urls):
async with aiohttp.ClientSession(loop=loop) as session:
return await fetch_all(session, urls, loop)
async def fetch_all(session, urls, loop):
results = await asyncio.gather(*[loop.create_task(fetch(session, url)) for url in urls])
return results
async def fetch(session, url):
with async_timeout.timeout(10):
async with session.get(url[0]) as response:
#newFeature = fmeobjects.FMEFeature()
response = await response
#newFeature.setAttribute('response', response_data)
#newFeature.setAttribute('_xmin',url[1])
#newFeature.setAttribute('_xmax',url[2])
#newFeature.setAttribute('_ymin',url[3])
#newFeature.setAttribute('_ymax',url[4])
return response, url[1], url[2], url[3], url[4]
但现在我遇到了这个错误:
Python Exception <TypeError>: object ClientResponse can't be used in 'await'
expression
Traceback (most recent call last):
File "<string>", line 20, in input
File "asyncio\base_events.py", line 467, in run_until_complete
File "<string>", line 29, in main
File "<string>", line 33, in fetch_all
File "<string>", line 41, in fetch
TypeError: object ClientResponse can't be used in 'await' expression
【问题讨论】:
您可能想看看trio
库。它的界面比asyncio
标准库要简单得多。
酷,看起来它在 python 3.7 中实现了类似于 asyncio 的运行。我会看看这个。
【参考方案1】:
您将使用事件循环来执行异步函数以完成:
newfeature = asyncio.get_event_loop().run_until_complete(main(urls_and_coords))
(这种技术已经在 main
内部使用。我不知道为什么,因为 main
是 async
你可以/应该在那里使用 await fetch_all(...)
。)
【讨论】:
但是我可能需要重写 main,因为它已经有一个 event_loop? 有趣的是,我实际上并不确定这是否会导致任何问题。但正如我所写,在async
函数中使用 run_until_complete
没有什么意义,你应该简单地 await
它。
它对我有用。请注意,如果您没有 event_loop,因此出现错误:“RuntimeError: There is no current event loop in thread 'Thread-n'”,您可以在函数中添加 asyncio.set_event_loop(asyncio.new_event_loop())
来设置事件循环。
【参考方案2】:
@deceze 答案可能是您在 Python 3.6 中可以做到的最好的答案。
但是在 Python 3.7 中,你可以直接使用asyncio.run
,方式如下:
newfeature = asyncio.run(main(urls))
它将正确创建、处理和关闭event_loop
。
【讨论】:
但是如果代码已经在asyncio.run
调用中运行怎么办?如果你在用asyncio.run
调用的函数中使用asyncio.run
,那么你会得到RuntimeError: asyncio.run() cannot be called from a running event loop
@birgersp 如果您已经在 event_loop 中,您可以简单地通过 result = await main(urls)
调用它。【参考方案3】:
另一个可能有用的选项是syncer
PyPI 包:
from syncer import sync
@sync
async def print_data():
print(await get_data())
print_data() # Can be called synchronously
【讨论】:
以上是关于如何从同步代码 Python 中调用异步函数的主要内容,如果未能解决你的问题,请参考以下文章
如何使用从 Swift 代码调用的线程在 C++ 上创建异步调用函数?