RuntimeError:异步+ apscheduler中的线程中没有当前事件循环

Posted

技术标签:

【中文标题】RuntimeError:异步+ apscheduler中的线程中没有当前事件循环【英文标题】:RuntimeError: There is no current event loop in thread in async + apscheduler 【发布时间】:2017-10-13 10:27:22 【问题描述】:

我有一个异步功能,需要每 N 分钟运行一次 apscheduller。 下面有一段python代码

URL_LIST = ['<url1>',
            '<url2>',
            '<url2>',
            ]

def demo_async(urls):
    """Fetch list of web pages asynchronously."""
    loop = asyncio.get_event_loop() # event loop
    future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
    loop.run_until_complete(future) # loop until done

async def fetch_all(urls):
    tasks = [] # dictionary of start times for each url
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(fetch(url, session))
            tasks.append(task) # create list of tasks
        _ = await asyncio.gather(*tasks) # gather task responses

async def fetch(url, session):
    """Fetch a url, using specified ClientSession."""
    async with session.get(url) as response:
        resp = await response.read()
        print(resp)

if __name__ == '__main__':
    scheduler = Asyncioscheduler()
    scheduler.add_job(demo_async, args=[URL_LIST], trigger='interval', seconds=15)
    scheduler.start()
    print('Press Ctrl+0 to exit'.format('Break' if os.name == 'nt' else 'C'))

    # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass

但是当我尝试运行它时,我有下一个错误信息

Job "demo_async (trigger: interval[0:00:15], next run at: 2017-10-12 18:21:12 +04)" raised an exception.....
..........\lib\asyncio\events.py", line 584, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread '<concurrent.futures.thread.ThreadPoolExecutor object at 0x0356B150>_0'.

你能帮我解决这个问题吗? Python 3.6,APScheduler 3.3.1,

【问题讨论】:

【参考方案1】:

在你的def demo_async(urls),尝试替换:

loop = asyncio.get_event_loop()

与:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

【讨论】:

不要不要这样做!这只会导致第二个事件循环运行。考虑到 APScheduler 对协程函数的原生支持,这完全没有意义。如果你试图让在一个事件循环中运行的东西与第一个事件循环交互,那么糟糕的事情就会发生。 有时您需要第二个事件循环。有时你不会 我发现在单独的线程中运行循环时这是必要的。 嗨@AlexGrönholm,你能解释一下为什么会这样吗?这将具有巨大的价值。 @AlexGrönholm,能否请您扩展您的第二条评论,您甚至可以将我重定向到任何博客或文档。我需要在生成的新进程中使用get_event_loop。但我得到这个错误。以上答案将解决我的问题。【参考方案2】:

没有提到的重要一点是错误发生的原因。就我个人而言,了解错误发生的原因与解决实际问题同样重要。

我们来看看BaseDefaultEventLoopPolicyget_event_loop的实现:

class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
    ...

    def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop

你可以看到self.set_event_loop(self.new_event_loop())只有在满足以下所有条件时才会执行:

self._local._loop is None - _local._loop 未设置 not self._local._set_called - set_event_loop 尚未被调用 isinstance(threading.current_thread(), threading._MainThread) - 当前线程是主要线程(在你的情况下这不是真的)

因此引发异常,因为当前线程中没有设置循环:

if self._local._loop is None:
    raise RuntimeError('There is no current event loop in thread %r.'
                       % threading.current_thread().name)

【讨论】:

你明白为什么isinstance... 条件很重要吗?为什么我们只在 _MainThread 中创建事件循环? 因为在 Linux 上,SIGCHLD 只传递给主线程,所以终止的子进程只能通过在主线程中运行的事件循环来确认。【参考方案3】:

只需将fetch_all 直接传递给scheduler.add_job()。 asyncio 调度器支持协程函数作为作业目标。

如果目标可调用对象不是协程函数,它将在工作线程中运行(由于历史原因),因此异常。

【讨论】:

您好,感谢您的回复,您能解释一下为什么调度程序添加作业比添加第二个事件循环更好吗? @radzak 在他的回复中提到问题存在的原因是他不在主线程中。 第二个事件循环只是增加了更多的复杂性,没有任何好处。 感谢您的评论,您能否详细说明一下添加第二个事件循环的理想场景?如果我理解正确,它会添加第二个进程,该进程可以处理并行请求,而 schehduler 则采用多线程解决方案(由于 GIL imho,它在 Cpython 中同时处理但不是并行处理)【参考方案4】:

使用asyncio.run() 而不是直接使用事件循环。 它创建一个新循环并在完成时关闭它。

这就是“跑步”的样子:

if events._get_running_loop() is not None:
    raise RuntimeError(
        "asyncio.run() cannot be called from a running event loop")

if not coroutines.iscoroutine(main):
    raise ValueError("a coroutine was expected, got !r".format(main))

loop = events.new_event_loop()
try:
    events.set_event_loop(loop)
    loop.set_debug(debug)
    return loop.run_until_complete(main)
finally:
    try:
        _cancel_all_tasks(loop)
        loop.run_until_complete(loop.shutdown_asyncgens())
    finally:
        events.set_event_loop(None)
        loop.close()

【讨论】:

我正在使用asyncio.run(),但仍然有这个问题...【参考方案5】:

由于这个问题继续出现在第一页,我将我的问题和我的答案写在这里。

我在使用flask-socketio 和Bleak 时有一个RuntimeError: There is no current event loop in thread 'Thread-X'.


编辑:好吧,我重构了我的文件并创建了一个类。

我在构造函数中初始化了循环,现在一切正常:

class BLE:
    def __init__(self):
        self.loop = asyncio.get_event_loop()

    # function example, improvement of
    # https://github.com/hbldh/bleak/blob/master/examples/discover.py :
    def list_bluetooth_low_energy(self) -> list:
        async def run() -> list:
            BLElist = []
            devices = await bleak.discover()
            for d in devices:
                BLElist.append(d.name)
            return 'success', BLElist
        return self.loop.run_until_complete(run())

用法:

ble = path.to.lib.BLE()
list = ble.list_bluetooth_low_energy()

原答案:

解决方案很愚蠢。我没有注意我做了什么,但是我把一些import移出了一个函数,像这样:

import asyncio, platform
from bleak import discover

def listBLE() -> dict:
    async def run() -> dict:
        # my code that keep throwing exceptions.

    loop = asyncio.get_event_loop()
    ble_list = loop.run_until_complete(run())
    return ble_list

所以我认为我需要更改我的代码中的某些内容,并在 get_event_loop() 行之前使用这段代码创建了一个新的事件循环:

loop = asyncio.new_event_loop()
loop = asyncio.set_event_loop()

此时我很高兴,因为我有一个循环在运行。

但没有响应。而且我的代码依赖于超时来返回一些值,所以这对我的应用程序来说非常糟糕。

我花了将近两个小时才发现问题出在import,这是我的(工作)代码:

def list() -> dict:
    import asyncio, platform
    from bleak import discover

    async def run() -> dict:
        # my code running perfectly

    loop = asyncio.get_event_loop()
    ble_list  = loop.run_until_complete(run())
    return ble_list

【讨论】:

RuntimeError: There is no current event loop in thread 'MainThread'.

以上是关于RuntimeError:异步+ apscheduler中的线程中没有当前事件循环的主要内容,如果未能解决你的问题,请参考以下文章

Python Asyncio - RuntimeError:无法关闭正在运行的事件循环

Python 成功解决报错 asyncio RuntimeError: This event loop is already running

Discord Python 异步事件循环已关闭

如何在没有等待的情况下调用异步函数?

RuntimeError:python 多处理错误

Python:循环数据时继续通过“RuntimeError”