corroutine RuntimeError中的Asyncio:没有正在运行的事件循环

Posted

技术标签:

【中文标题】corroutine RuntimeError中的Asyncio:没有正在运行的事件循环【英文标题】:Asyncio in corroutine RuntimeError: no running event loop 【发布时间】:2019-11-08 22:54:22 【问题描述】:

我正在编写多进程代码,它可以在 Python 3.7 中完美运行。然而,我希望其中一个并行进程执行 IO 进程永远使用 AsyncIO 来获得更好的性能,但一直无法让它运行。

Ubuntu 18.04、Python 3.7、AsyncIO、pipenv(已安装所有 pip 库)

特别是该方法使用多线程按预期运行,这是我想用 AsyncIO 替换的。

我已经用谷歌搜索并尝试在 main() 函数中循环,现在只在预期的 cor-routine 中,查看了示例并阅读了有关这种新的异步方法的信息,但到目前为止还没有结果。

下面是被删除的app.py代码:python app.py

import sys
import traceback
import logging
import asyncio

from config import DEBUG
from config import log_config
from <some-module> import <some-class>

if DEBUG:
    logging.config.dictConfig(log_config())
else:
    logging.basicConfig(
        level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
logger = logging.getLogger(__name__)


def main():
    try:
        <some> = <some-class>([
            'some-data1.csv',
            'some-data2.csv'
            ])
        <some>.run()

    except:

        traceback.print_exc()
        pdb.post_mortem()

    sys.exit(0)


if __name__ == '__main__':

    asyncio.run(main())

这是我定义给定类的代码

    _sql_client = SQLServer()
    _blob_client = BlockBlobStore()
    _keys = KeyVault()
    _data_source = _keys.fetch('some-data')
    #  Multiprocessing
    _manager = mp.Manager()
    _ns = _manager.Namespace()

    def __init__(self, list_of_collateral_files: list) -> None:

    @timeit
    def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _get_hours(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _load_original_bids(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _merge_bids_with_hours(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _get_collaterial_per_month(self, ns: mp.managers.NamespaceProxy) -> None:

    @timeit
    def _calc_bid_per_path(self) -> None:

    @timeit
    def run(self) -> None:

包含异步代码的方法在这里:

    def _get_filter_collateral(self, ns: mp.managers.NamespaceProxy) -> None:

        all_files = self._blob_client.download_blobs(self._list_of_blob_files)

        _all_dfs = pd.DataFrame()
        async def read_task(file_: str) -> None:
            nonlocal _all_dfs
            df = pd.read_csv(StringIO(file_.content))
            _all_dfs = _all_dfs.append(df, sort=False)

        tasks = []
        loop = asyncio.new_event_loop()

        for file_ in all_files:
            tasks.append(asyncio.create_task(read_task(file_)))

        loop.run_until_complete(asyncio.wait(tasks))
        loop.close()

        _all_dfs['TOU'] = _all_dfs['TOU'].map(lambda x: 'OFFPEAK' if x == 'OFF' else 'ONPEAK')
        ns.dfs = _all_dfs

调用特定序列的方法和这个异步方法是:

    def run(self) -> None:
        extract = []
        extract.append(mp.Process(target=self._get_filter_collateral, args=(self._ns, )))
        extract.append(mp.Process(target=self._get_hours, args=(self._ns, )))
        extract.append(mp.Process(target=self._load_original_bids, args=(self._ns, )))

        #  Start the parallel processes
        for process in extract:
            process.start()

        #  Await for database process to end
        extract[1].join()
        extract[2].join()

        #  Merge both database results
        self._merge_bids_with_hours(self._ns)

        extract[0].join()

        self._get_collaterial_per_month(self._ns)
        self._calc_bid_per_path()
        self._save_reports()
        self._upload_data()

这些是我得到的错误:

Process Process-2:
Traceback (most recent call last):
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 104, in _get_filter_collateral
    tasks.append(asyncio.create_task(read_task(file_)))
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/asyncio/tasks.py", line 350, in create_task
    loop = events.get_running_loop()
RuntimeError: no running event loop
<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: RuntimeWarning: coroutine '<some-class>._get_filter_collateral.<locals>.read_task' was never awaited
  traceback.print_exc()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
DEBUG Calculating monthly collateral...
Traceback (most recent call last):
  File "app.py", line 25, in main
    caiso.run()
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 425, in run
    self._get_collaterial_per_month(self._ns)
  File "<some-path>/src/azure/application/utils/lib.py", line 10, in timed
    result = method(*args, **kwargs)
  File "<some-path>/src/azure/application/caiso/main.py", line 196, in _get_collaterial_per_month
    credit_margin = ns.dfs
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 1122, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "<some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py", line 834, in _callmethod
    raise convert_to_error(kind, result)
AttributeError: 'Namespace' object has no attribute 'dfs'
> <some-path>/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/managers.py(834)_callmethod()
-> raise convert_to_error(kind, result)
(Pdb)

【问题讨论】:

【参考方案1】:

Traceback 日志看来,您似乎正在尝试将任务添加到未运行的事件循环

/.pyenv/versions/3.7.4/lib/python3.7/multiprocessing/process.py:313: 运行时警告:协程 '._get_filter_collat​​eral..read_task' 从来没有 期待中

循环刚刚创建,它还没有运行,因此asyncio无法将任务附加到它。

以下示例将重现相同的结果,添加任务,然后尝试 await 以完成所有任务:

import asyncio
async def func(num):
    print('My name is func 0...'.format(num))

loop = asyncio.get_event_loop()
tasks = list()
for i in range(5):
    tasks.append(asyncio.create_task(func(i)))
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

结果:

Traceback (most recent call last):
  File "C:/tmp/stack_overflow.py", line 42, in <module>
    tasks.append(asyncio.create_task(func(i)))
  File "C:\Users\Amiram\AppData\Local\Programs\Python\Python37-32\lib\asyncio\tasks.py", line 324, in create_task
    loop = events.get_running_loop()
RuntimeError: no running event loop
sys:1: RuntimeWarning: coroutine 'func' was never awaited

尽管如此,解决方案非常简单,您只需将任务添加到创建的循环中 - 而不是要求 asyncio 执行它。 唯一需要更改的是以下行:

tasks.append(asyncio.create_task(func(i)))

将任务的创建从asyncio 更改为新创建的loop,你可以这样做,因为这是你的循环,不像 asynio正在寻找一个正在运行的。

所以新行应该是这样的:

tasks.append(loop.create_task(func(i)))

另一个解决方案可能是运行 async 函数并在那里创建任务(因为该循环现在已经在运行,asyncio 可以将任务附加到它):

import asyncio
async def func(num):
    print('Starting func 0...'.format(num))
    await asyncio.sleep(0.1)
    print('Ending func 0...'.format(num))

loop = asyncio.get_event_loop()
async def create_tasks_func():
    tasks = list()
    for i in range(5):
        tasks.append(asyncio.create_task(func(i)))
    await asyncio.wait(tasks)
loop.run_until_complete(create_tasks_func())
loop.close()

这个简单的改变将导致:

Starting func 0...
Starting func 1...
Starting func 2...
Starting func 3...
Starting func 4...
Ending func 0...
Ending func 2...
Ending func 4...
Ending func 1...
Ending func 3...

【讨论】:

以上是关于corroutine RuntimeError中的Asyncio:没有正在运行的事件循环的主要内容,如果未能解决你的问题,请参考以下文章

RuntimeError:在 Rack 中的迭代期间无法将新密钥添加到哈希中

RuntimeError:异步+ apscheduler中的线程中没有当前事件循环

python和PyQt5中的Quamash QventLoop“RuntimeError:没有运行事件循环”错误

PyQt4 python中的“RuntimeError:调用Python对象时超出最大递归深度”错误

在循环中更改 OrderedDict 中的键名会导致 RuntimeError: OrderedDict 在迭代期间发生突变

RuntimeError:模型类 xxx 未声明显式 app_label 且不在 INSTALLED_APPS 中的应用程序中