Asyncio 两个循环用于不同的 I/O 任务?

Posted

技术标签:

【中文标题】Asyncio 两个循环用于不同的 I/O 任务?【英文标题】:Asyncio two loops for different I/O tasks? 【发布时间】:2015-07-25 05:12:20 【问题描述】:

我正在使用 Python3 Asyncio 模块来创建负载平衡应用程序。我有两个繁重的 IO 任务:

一个 SNMP 轮询模块,用于确定可能的最佳服务器 一个“类似代理”的模块,用于平衡对所选服务器的请求。

两个进程将永远运行,彼此独立,不应被另一个进程阻塞。

我不能使用 1 个事件循环,因为它们会互相阻塞,有没有办法拥有 2 个事件循环或者我必须使用多线程/处理?

我尝试使用 asyncio.new_event_loop() 但没有成功。

【问题讨论】:

如果设计得当,即使 Asyncio 协程运行在同一个循环上,它们也不会相互阻塞。 Asyncio 有效地在多个协程/任务之间来回切换,以提供并发的效果,即使使用单个线程也是如此。 @shongololo 但是如果我有一个使用“loop.run_forever()”运行的循环,它会阻塞循环,除非我停止它,否则我不能做任何其他事情。还是我理解错了?这就是我看到的行为...... 不确定我是否完全理解这种困境。有什么东西阻止你在同一个循环中运行吗?当您的代码中遇到“yield from”点时,asyncio 将自动来回切换(在同一个循环中)。这基本上是 asyncio 的重点,它允许您在同一个循环内运行多个可能阻塞的协程,而不会阻塞另一个。 如果代理服务器一直在运行,它不能来回切换。代理侦听客户端请求并使它们异步,但另一个任务无法执行,因为这个任务永远服务。 asyncio 的全部意义在于您可以同时运行数千个 I/O 繁重的任务,因此您根本不需要线程,这正是 asyncio 的用途。只需在同一个循环中运行两个协程(SNMP 和代理)即可。在技​​术方面:在调用 loop.run_forever() 之前,您必须使它们都可用于事件循环 【参考方案1】:

asyncio 的全部意义在于您可以同时运行数千个 I/O 繁重的任务,因此您根本不需要Threads,这正是asyncio 的用途。只需在同一个循环中运行两个协程(SNMP 和代理)即可。 在调用loop.run_forever() 之前,您必须使它们都可用于事件循环。像这样的:

import asyncio

async def snmp():
    print("Doing the snmp thing")
    await asyncio.sleep(1)

async def proxy():
    print("Doing the proxy thing")
    await asyncio.sleep(2)

async def main():
    while True:
        await snmp()
        await proxy()

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()

我不知道你的代码结构,所以不同的模块可能有自己的无限循环什么的,在这种情况下你可以这样运行:

import asyncio

async def snmp():
    while True:
        print("Doing the snmp thing")
        await asyncio.sleep(1)

async def proxy():
    while True:
        print("Doing the proxy thing")
        await asyncio.sleep(2)

loop = asyncio.get_event_loop()
loop.create_task(snmp())
loop.create_task(proxy())
loop.run_forever()

请记住,snmpproxy 都需要以异步感知方式编写的协程 (async def)。 asyncio 不会让简单的阻塞 Python 函数突然“异步”。

在您的具体情况下,我怀疑您有点困惑(无意冒犯!),因为编写良好的异步模块永远不会在同一个循环中相互阻塞。如果是这种情况,您根本不需要asyncio,只需在单独的Thread 中运行其中一个,而无需处理任何asyncio 内容。

【讨论】:

如果不知道需要运行所有任务怎么办? @Jako 这就像问“我如何调用一个我不知道名称的函数?”你不能。 "...你根本不需要线程..." ThreadPoolExecutor呢? @kissgyorgy 从技术上讲,你可以调用你没有名字的函数——特别是如果它们没有名字。最简单的例子:(lambda x: x)(42)。 ;) @MateenUlhaq %s/知道/有参考的名字/"【参考方案2】:

回答我自己的问题以发布我的解决方案:

我最终做的是在线程内为轮询模块创建一个线程和一个新的事件循环,所以现在每个模块都在不同的循环中运行。这不是一个完美的解决方案,但它是唯一对我有意义的解决方案(我想避免线程,但因为它只是一个......)。示例:

import asyncio
import threading


def worker():
    second_loop = asyncio.new_event_loop()
    execute_polling_coroutines_forever(second_loop)
    return

threads = []
t = threading.Thread(target=worker)
threads.append(t)
t.start()

loop = asyncio.get_event_loop()
execute_proxy_coroutines_forever(loop)

Asyncio 要求每个循环都在同一个线程中运行其协程。使用这种方法,每个线程都有一个事件循环,它们是完全独立的:每个循环都会在自己的线程上执行它的协程,所以这不是问题。 正如我所说,它可能不是最好的解决方案,但它对我有用。

【讨论】:

在工作线程中使用asyncio.set_event_loop() 可以让您手动省略传递循环实例。请在下面查看我的回答:***.com/a/62631135/1592410【参考方案3】:

尽管在大多数情况下,您在使用 asyncio 时不需要运行多个事件循环,但人们不应该假设他们的假设适用于所有情况,或者只是给您他们认为更好的东西而不直接针对您的原始问题.

这是一个演示,展示了您可以在线程中创建新的事件循环。与您自己的 answer 相比,set_event_loop 可以让您在每次执行基于 asyncio 的操作时不传递 loop 对象。

import asyncio
import threading


async def print_env_info_async():
    # As you can see each work thread has its own asyncio event loop.
    print(f"Thread: threading.get_ident(), event loop: id(asyncio.get_running_loop())")


async def work():
    while True:
        await print_env_info_async()
        await asyncio.sleep(1)


def worker():
    new_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(new_loop)
    new_loop.run_until_complete(work())
    return


number_of_threads = 2
for _ in range(number_of_threads):
    threading.Thread(target=worker).start()

理想情况下,您应该将繁重的工作放在工作线程中,并让 asncyio 线程尽可能轻松地运行。将 asyncio 线程视为桌面或移动应用程序的 GUI 线程,您不想阻止它。工作线程通常非常繁忙,这是您不想在工作线程中创建单独的异步事件循环的原因之一。这是一个如何使用单个异步事件循环管理繁重的工作线程的示例。这是此类用例中最常见的做法:

import asyncio
import concurrent.futures
import threading
import time


def print_env_info(source_thread_id):
    # This will be called in the main thread where the default asyncio event loop lives.
    print(f"Thread: threading.get_ident(), event loop: id(asyncio.get_running_loop()), source thread: source_thread_id")


def work(event_loop):
    while True:
        # The following line will fail because there's no asyncio event loop running in this worker thread.
        # print(f"Thread: threading.get_ident(), event loop: id(asyncio.get_running_loop())")
        event_loop.call_soon_threadsafe(print_env_info, threading.get_ident())
        time.sleep(1)


async def worker():
    print(f"Thread: threading.get_ident(), event loop: id(asyncio.get_running_loop())")
    loop = asyncio.get_running_loop()
    number_of_threads = 2
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=number_of_threads)
    for _ in range(number_of_threads):
        asyncio.ensure_future(loop.run_in_executor(executor, work, loop))


loop = asyncio.get_event_loop()
loop.create_task(worker())
loop.run_forever()

【讨论】:

您可能希望将 ensure_future 更新为 create_task,因为 asyncio documentation 表明 create_task 的可读性较低。顺便说一句,答案很好!这非常有帮助!【参考方案4】:

我知道这是一个旧线程,但它可能对某人仍有帮助。 我不擅长 asyncio 但这里是@kissgyorgy 答案的一些改进解决方案。我们不是单独等待每个闭包,而是创建任务列表并稍后触发它们(python 3.9):

import asyncio

async def snmp():
    while True:
        print("Doing the snmp thing")
        await asyncio.sleep(0.4)

async def proxy():
    while True:
        print("Doing the proxy thing")
        await asyncio.sleep(2)

async def main():
        tasks = []
        tasks.append(asyncio.create_task(snmp()))
        tasks.append(asyncio.create_task(proxy()))

        await asyncio.gather(*tasks)

asyncio.run(main())

结果:

Doing the snmp thing
Doing the proxy thing
Doing the snmp thing
Doing the snmp thing
Doing the snmp thing
Doing the snmp thing
Doing the proxy thing

【讨论】:

【参考方案5】:

Asyncio 事件循环是一个单线程运行,它不会并行运行任何东西,这就是它的设计方式。我能想到的最接近的方法是使用asyncio.wait

from asyncio import coroutine
import asyncio

@coroutine
def some_work(x, y):
    print("Going to do some heavy work")
    yield from asyncio.sleep(1.0)
    print(x + y)

@coroutine
def some_other_work(x, y):
    print("Going to do some other heavy work")
    yield from asyncio.sleep(3.0)
    print(x * y)



if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([asyncio.async(some_work(3, 4)), 
                            asyncio.async(some_other_work(3, 4))]))
    loop.close()

另一种方法是使用asyncio.gather() - 它从给定的期货列表中返回未来的结果。

tasks = [asyncio.Task(some_work(3, 4)), asyncio.Task(some_other_work(3, 4))]
loop.run_until_complete(asyncio.gather(*tasks))

【讨论】:

我知道事件循环是单线程的,这就是为什么我问是否有任何方法可以使用 2 个不同的循环来实现它。我不能使用这种方法,因为正如我所说,每个协程都将永远运行,它们不会结束。当我弄清楚时会尝试发布我的解决方案,谢谢。 @brunoop 你仍然可以使用这种方法——你的任务没有结束并不重要。只需使用asyncio.async(your_coroutine()) 安排您需要的协程,然后在它们全部安排好后调用loop.run_forever()。无需使用两个线程。【参考方案6】:

如果代理服务器一直在运行,它就不能来回切换。代理侦听客户端请求并使它们异步,但另一个任务无法执行,因为这个任务永远服务。

如果代理是一个协程并且正在饿死 SNMP 轮询器(从不等待),那么客户端请求是不是也被饿死了?

每个协程都将永远运行,它们不会结束

这应该没问题,只要他们这样做await/yield from。 echo server 也将永远运行,这并不意味着您不能在同一个循环中运行多个服务器(尽管在不同的端口上)。

【讨论】:

以上是关于Asyncio 两个循环用于不同的 I/O 任务?的主要内容,如果未能解决你的问题,请参考以下文章

“GIL”如何影响具有 i/o 绑定任务的 Python asyncio `run_in_executor`?

#yyds干货盘点# 听说过python协程没?听说过 asyncio 库没?都在这一篇博客了

python asyncio 异步 I/O - 实现并发http请求(asyncio + aiohttp)

asyncio 是不是支持文件操作的异步 I/O?

asyncio时间循环中运行阻塞任务

使用 asyncio 的非阻塞 I/O