Asyncio 函数在从脚本调用而不是从 Flask 路由调用时有效

Posted

技术标签:

【中文标题】Asyncio 函数在从脚本调用而不是从 Flask 路由调用时有效【英文标题】:Asyncio function works when called from script but not from Flask route 【发布时间】:2019-08-23 15:25:19 【问题描述】:

我是 Python 和这些库/模块的新手。我正在编写一个简单的 ping-test 网络扫描仪作为学习项目。

我首先使用 asyncio 开发了一个脚本来 ping 网络上的地址

#ip_test.py
import asyncio
import ipaddress

async def ping(addr):
    proc = await asyncio.create_subprocess_exec(
        'ping','-W','1','-c','3',addr,
        stdout=asyncio.subprocess.PIPE
    )
    await proc.wait()
    return proc.returncode

async def pingMain(net):
    #hosts() returns list of Ipv4Address objects
    result = await asyncio.gather(*(ping(str(addr)) for addr in net.hosts()))
    return result

def getHosts(net_): #net_ is an Ipv4Network object
    return asyncio.run(pingMain(net_))
    #Returns list of response codes which I then zip with the list of searched ips

当我打开 python 并运行以下命令时,它按预期工作:

import ip_test as iptest
import ipaddress
print(iptest.getHosts(ipaddress.ip_network('192.168.1.0/29')))
#prints: [0, 0, 0, 1, 1, 1] as expected on this network

但是,最终目标是通过表单输入从用户那里获取输入(结果被记录到数据库中,这是一个用于说明目的的简化示例)。我通过烧瓶路线收集输入:

@app.route("/newscan",methods=['POST'])
def newScan():
    form = request.form
    networkstring = form.get('network') + "/" + form.get('mask')
    result = iptest.getHosts(ipaddress.ip_network(networkstring))
    return result

当我以这种方式调用模块时,出现错误:Runtime Error: Cannot add child handler, the child watcher does not have a loop attached.

为什么当我导入模块并从命令行运行函数时它会起作用,但当我使用来自烧瓶路由的相同输入调用它时却不起作用?

编辑:追溯:

Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2463, in __call__
  return self.wsgi_app(environ, start_response)
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2449, in wsgi_app
  response = self.handle_exception(e)
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1866, in handle_exception
  reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
  raise value
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 2446, in wsgi_app
  response = self.full_dispatch_request()
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1951, in full_dispatch_request
  rv = self.handle_user_exception(e)
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1820, in handle_user_exception
  reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python3.7/site-packages/flask/_compat.py", line 39, in reraise
  raise value
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1949, in full_dispatch_request
  rv = self.dispatch_request()
File "/usr/local/lib/python3.7/site-packages/flask/app.py", line 1935, in dispatch_request
  return self.view_functions[rule.endpoint](**req.view_args)
File "/app/app.py", line 41, in newScan
  result = iptest.getHosts(ipaddress.ip_network(networkstring))
File "/app/ip_test.py", line 22, in getHosts
  res = asyncio.run(pingMain(net_))
File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
  return loop.run_until_complete(main)
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
  return future.result()
File "/app/ip_test.py", line 15, in pingMain
  result = await asyncio.gather(*(ping(str(addr)) for addr in net.hosts()))
File "/app/ip_test.py", line 7, in ping
  stdout=asyncio.subprocess.PIPE
File "/usr/local/lib/python3.7/asyncio/subprocess.py", line 217, in create_subprocess_exec
  stderr=stderr, **kwds)
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 1529, in subprocess_exec
  bufsize, **kwargs)
File "/usr/local/lib/python3.7/asyncio/unix_events.py", line 193, in _make_subprocess_transport
  self._child_watcher_callback, transp)
File "/usr/local/lib/python3.7/asyncio/unix_events.py", line 930, in add_child_handler
  "Cannot add child handler, "
RuntimeError: Cannot add child handler, the child watcher does not have a loop attached

【问题讨论】:

你为什么在这里完全使用asyncio?您也可以使用subprocess module。 请提供您遇到的异常的完整追溯。我可以猜测这里的回溯可能是什么,但情况并非总是如此。 已按要求添加回溯。我使用 asyncio 是因为在 255 个(或更多)地址上同步运行 ping -c 3 需要 3 * 255 秒。使用 asyncio 运行大约需要 3 秒。 实际上我认为子进程也是如此。我错了吗? 【参考方案1】:

您正试图从主线程以外的线程运行异步子进程。这需要从主线程进行一些初始设置,请参阅 asyncio Subprocesses 文档的 Subprocesses and Threads section:

标准异步事件循环支持从不同线程运行子进程,但有限制:

事件循环必须在主线程中运行。 在从其他线程执行子进程之前,必须在主线程中实例化子观察程序。在主线程中调用get_child_watcher()函数来实例化child watcher。

这里发生的情况是您的 WSGI 服务器正在使用多个线程来处理传入的请求,因此请求处理程序没有在 main 线程上运行。但是您的代码使用asyncio.run() 来启动一个新的事件循环,因此您的asyncio.create_subprocess_exec() 调用将失败,因为主线程上没有子观察者。

您必须从主线程开始一个循环(而不是停止它),并在该线程上调用asyncio.get_child_watcher(),以使您的代码不会失败:

# to be run on the main thread, set up a subprocess child watcher
assert threading.current_thread() is threading.main_thread()
asyncio.get_event_loop()
asyncio.get_child_watcher()

注意:此限制仅适用于 Python 3.7 以下的 Python 版本,限制为 has been lifted in Python 3.8。

但是,只是运行一堆子进程并等待这些完成,使用asyncio 是大材小用;你的操作系统可以并行运行子进程就好了。只需使用subprocess.Popen() 并通过Popen.poll() method 检查每个进程:

import subprocess

def ping_proc(addr):
    return subprocess.Popen(
        ['ping', '-W', '1', '-c', '3', addr],
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL
    )

def get_hosts(net):
    # hosts() returns list of Ipv4Address objects
    procs = [ping_proc(str(addr)) for addr in net.hosts()]
    while any(p.poll() is None for p in procs):
        time.sleep(0.1)
    return [p.returncode for p in procs]

Popen.poll() 不阻塞;如果尚未设置Popen.returncode,它会使用waitpid([pid], WNOHANG) 检查操作系统的进程状态,如果进程仍在运行,则返回None,或者返回现在可用的returncode 值。以上只是在循环中检查这些状态,中间有短暂的睡眠以避免颠簸。

asyncio 子进程包装器(至少在 POSIX 上)要么使用 SIGCHLD 信号处理程序来通知子进程退出,要么(在 Python 3.8 中)为每个子进程使用单独的线程来使用阻塞 waitpid()调用创建的每个子进程。您可以实现相同的信号处理程序,但考虑到信号处理程序只能在主线程上注册,因此您必须跳过几个环节才能将传入的SIGCHLD 信号信息传递给正确的线程。

【讨论】:

谢谢!我对subprocess 做了一个不正确的假设。至少我学到了一些关于asyncio的东西。

以上是关于Asyncio 函数在从脚本调用而不是从 Flask 路由调用时有效的主要内容,如果未能解决你的问题,请参考以下文章

如何从页面模板上调用的函数而不是从ins获取当前页面永久链接url

对象 Promise 显示而不是从 API 调用中提取的数据

SQLAlchemy Asyncio ORM 在从元数据中检索表和列时无法查询数据库

当我从 Azure 函数而不是从控制台应用程序运行 LINQ 查询时,它会失败

多次调用一个函数而不等待它完成

使用 asyncio 等待子进程的结果