自定义异步执行器

Posted

技术标签:

【中文标题】自定义异步执行器【英文标题】:Custom asyncio executor 【发布时间】:2021-11-26 02:57:53 【问题描述】:

我需要在以下条件(行为)下使用 asyncio 来实现算法:

    检查参数列表是否为空,如果为空则结束执行 从参数列表中弹出下一个参数创建协程 这个参数并安排它“同时”执行是不可能的 协程完成时执行的协程不超过 'async_level' 个 执行 -> 转到步骤 1

不必计划一次完成所有任务(如 asyncio.gather),而是分部分完成。当下一个任务完成执行时,一个新的任务会取代它。

我尝试使用 asyncio.as_completed() 来实现,但实际上并没有按预期工作:

async_level = 4
params_count = 10
params = [i for i in range(1, params_count + 1)]

tasks = asyncio.create_task(job(param)) for param in params[0: async_level]
params = iter(params[async_level:])

while True:
    # NOTE: It wont work, because you can't add task in 'tasks' after 'as_completed' is invoked, so execution actually ends when the last coroutine in the 'as_completed' ends
    for task in asyncio.as_completed(tasks):
        print(f"len(tasks) = len(tasks)")
        await task

        try:
            param = next(params)
            tasks.add(asyncio.create_task(job(param)))
        except StopIteration:
            print("StopIteration")

    break

另外,我尝试使用 asyncio.BoundedSemaphore 来实现它,但不满足前两个条件:

async_level = 4
params_count = 10
params = [i for i in range(1, params_count + 1)]

async def semaphore_job(name, _asyncio_semaphore):
    async with _asyncio_semaphore:
        await job(name)

asyncio_semaphore = asyncio.BoundedSemaphore(async_level)
jobs = []
# NOTE: This variant schedule all jobs at ones and it's significant drawback because the count of jobs can be overwhelmed
for param in params:
    jobs.append(asyncio.ensure_future(semaphore_job(param, asyncio_semaphore)))
await asyncio.gather(*jobs)

如果您能提供任何帮助,我将不胜感激。

【问题讨论】:

我不明白您提出的信号量解决方案的问题究竟出在哪里。我知道它会提前安排所有工作,但大多数工作会在开始实际工作之前立即挂起信号量。任务是一个相对较小的 Python 对象,因此除非您拥有数百万个任务,否则它们不应该压倒 asyncio。您是否对该版本有特定问题,或者您是否试图通过不提前创建作业来优化内存使用? 不是数百万,而是数十万。我知道我可以使用 BoundedSemaphore 版本,但我不想提前安排所有任务,但只有在必要时才可以。使用数百兆字节的 RAM(任务 == 200 字节)来存储任务似乎很疯狂,我寻找了一个更优雅的解决方案。 很公平,我只是想检查一下“明显”的解决方案出了什么问题。 【参考方案1】:

看来我自己找到了解决方案:

import asyncio
from typing import Callable
from random import randrange
from asyncio import Semaphore, ensure_future, get_event_loop


async def job(name, time_range=10):
    timeout = randrange(time_range)
    print(f"Task 'name' started with timeout timeout")
    await asyncio.sleep(timeout)
    print(f"Task 'name' finished")
    return name


async def custom_executor(func: Callable, args: list, async_level: int = 4):
    """ Asynchronously executes no more that 'async_level' callables specified by 'func' with corresponding 'args' """
    loop = get_event_loop()
    sync = Semaphore()
    todo = set(args)
    doing = set()

    def _schedule_task():
        if todo:
            arg = todo.pop()
            fr = func(*arg) if isinstance(arg, (tuple, list, set)) else func(arg)
            f = ensure_future(fr, loop=loop)
            f.add_done_callback(_on_completion)
            doing.add(f)

    def _on_completion(f):
        doing.remove(f)
        sync.release()
        _schedule_task()

    for _ in range(min(async_level, len(todo))):
        _schedule_task()

    while True:
        if not doing:
            break

        await sync.acquire()


async def main():
    await custom_executor(job, [(1, 3), 7, (8, 2), 12, 5])

if __name__ == '__main__':
    asyncio.run(main())

但如果你知道更好的方法,请分享!

【讨论】:

【参考方案2】:

您可以创建固定数量的工作人员并使用队列为他们分配任务。它有点短,我发现它比使用回调的代码更容易推理。但是YMMV。

async def custom_executor(func, args, async_level=4):
    queue = asyncio.Queue(1)
    async def worker():
        while True:
            arg = await queue.get()
            fr = func(*arg) if isinstance(arg, (tuple, list, set)) else func(arg)
            await fr
            queue.task_done()

    # create the workers
    workers = [asyncio.create_task(worker()) for _ in range(async_level)]

    # Feed the workers tasks. Since the queue is bounded, this will also
    # wait for previous tasks to finish, similar to what you wanted to
    # achieve with as_completed().
    for x in args:
        await queue.put(x)
    await queue.join()  # wait for the remaining tasks to finish

    # cancel the now-idle workers
    for w in workers:
        w.cancel()

【讨论】:

以上是关于自定义异步执行器的主要内容,如果未能解决你的问题,请参考以下文章

Spark Streaming REST 自定义接收器

Promise前期准备---同步回调与异步回调

不知道怎么解耦业务?Spring Event 了解一下!

不知道怎么解耦业务?Spring Event 了解一下!

Android中的自定义最大异步任务

Spring Boot使用@Async实现异步调用:自定义线程池