当脚本在多处理工作人员中运行异步事件循环时,通过子进程运行脚本会挂起

Posted

技术标签:

【中文标题】当脚本在多处理工作人员中运行异步事件循环时,通过子进程运行脚本会挂起【英文标题】:Running a script through subprocess hangs when script runs an asyncio eventloop inside multiprocessing workers 【发布时间】:2019-10-16 18:34:28 【问题描述】:

在 Windows 上运行 Python 3.7.3,

我有一种情况,asyncio 事件循环永远不会从多处理产生的进程中中断。我不能显示所有的代码,但它是这样的:

    我使用multiprocessing 来加快使用第三方的查询 API。 此 API thirdparty.api 支持服务器-客户端架构并使用 asyncio 内部事件循环。它在一个单独的线程中运行一个事件循环;在该线程中,它调用event_loop.run_forever() 并仅在KeyboardInterrupt 上中断。 使用多处理运行工作脚本,API 总是返回,无论是成功还是失败。以前我遇到了 Py3.7.2 回归,其中在 Windows 上 venv Python 可执行文件以一种糟糕的方式工作https://bugs.python.org/issue35797。但现在这在 Py3.7.3 中得到了修复,我的问题仍然存在。 使用 subprocess 从另一个 Py27 脚本运行此脚本。在我的多处理工作进程中,如果查询失败,调用将永远不会返回,并且它无法自然地脱离工作进程,即使是通用异常处理程序也不会捕获任何内容并会卡住。

我的调用者脚本的代码 sn-ps:

#!/usr/bin/env python2

def main()
    try:
        cmd = ['C:\\Python\\Python37\\pythonw.exe', 'worker.py']
        print(' '.join(cmd))
        proc = subprocess.Popen(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        out, err = proc.communicate()
    except subprocess.CalledProcessError as err:
        print(err)
    except Exception:
        traceback.print_exc()
    else:
        print('try popen finished with else.')
    print('stdout: '.format(out))
    print('stderr: '.format(err))


if __name__ == '__main__':
    main()

我的工人 worker.py 函数的伪代码 sn-ps 如下所示:

#!/usr/bin/env python3

args = [
   ...
]


def worker(*mpargs):
    with thirdparty.api() as myclient:
        try:
            myclient.query(*args)
        except Exception:
            traceback.print_exc()


def exception_worker(*mpargs)
    raise RuntimeError('Making trouble!')


def main():
    logging.info('STARTED!')
    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        results = pool.map(worker, args)
        # results = pool.map(exception_worker, args)
        pool.close()
        pool.join()
    logging.info('ALL DONE!')


if __name__ == '__main__':
    main()

thirdparty.api 在其构造函数中启动事件循环:

self.loop = asyncio.get_event_loop()
if self.loop.is_closed():
    self.loop = asyncio.new_event_loop()
    asyncio.set_event_loop(self.loop)

然后在其单独的线程中:

try:
    self._loop.run_forever()
except KeyboardInterrupt:
    pass
self.loop.close()

我尝试了另一个工人 exception_worker,它只是抛出异常,而这个返回没有问题。

我应该如何解决这个问题?

【问题讨论】:

更多细节,请。你如何使用multiprocessingmyclient.query 看起来像一个普通函数,而不是一个异步函数。 @Sraw 感谢您的提示。我添加了更多细节。 【参考方案1】:

详细说明问题后,我终于在这篇帖子中找到了解决方案: Why am I getting NotImplementedError with async and await on Windows?

thirdparty.api 需要处理这个细节,在修复它之后,我的问题就消失了。

【讨论】:

以上是关于当脚本在多处理工作人员中运行异步事件循环时,通过子进程运行脚本会挂起的主要内容,如果未能解决你的问题,请参考以下文章

事件循环

为啥在异步事件循环运行时我无法捕获 SIGINT?

异步等待子处理?

进阶学习5:JavaScript异步编程——同步模式异步模式调用栈工作线程消息队列事件循环回调函数

libevent

在异步事件循环中运行 .render()(来自 requests_html)时,我收到错误“此事件循环已在运行”