自定义异步执行器
Posted
技术标签:
【中文标题】自定义异步执行器【英文标题】:Custom asyncio executor 【发布时间】:2021-11-26 02:57:53 【问题描述】:我需要在以下条件(行为)下使用 asyncio 来实现算法:
-
检查参数列表是否为空,如果为空则结束执行
从参数列表中弹出下一个参数创建协程
这个参数并安排它“同时”执行是不可能的
协程完成时执行的协程不超过 'async_level' 个
执行 -> 转到步骤 1
不必计划一次完成所有任务(如 asyncio.gather),而是分部分完成。当下一个任务完成执行时,一个新的任务会取代它。
我尝试使用 asyncio.as_completed() 来实现,但实际上并没有按预期工作:
async_level = 4
params_count = 10
params = [i for i in range(1, params_count + 1)]
tasks = asyncio.create_task(job(param)) for param in params[0: async_level]
params = iter(params[async_level:])
while True:
# NOTE: It wont work, because you can't add task in 'tasks' after 'as_completed' is invoked, so execution actually ends when the last coroutine in the 'as_completed' ends
for task in asyncio.as_completed(tasks):
print(f"len(tasks) = len(tasks)")
await task
try:
param = next(params)
tasks.add(asyncio.create_task(job(param)))
except StopIteration:
print("StopIteration")
break
另外,我尝试使用 asyncio.BoundedSemaphore 来实现它,但不满足前两个条件:
async_level = 4
params_count = 10
params = [i for i in range(1, params_count + 1)]
async def semaphore_job(name, _asyncio_semaphore):
async with _asyncio_semaphore:
await job(name)
asyncio_semaphore = asyncio.BoundedSemaphore(async_level)
jobs = []
# NOTE: This variant schedule all jobs at ones and it's significant drawback because the count of jobs can be overwhelmed
for param in params:
jobs.append(asyncio.ensure_future(semaphore_job(param, asyncio_semaphore)))
await asyncio.gather(*jobs)
如果您能提供任何帮助,我将不胜感激。
【问题讨论】:
我不明白您提出的信号量解决方案的问题究竟出在哪里。我知道它会提前安排所有工作,但大多数工作会在开始实际工作之前立即挂起信号量。任务是一个相对较小的 Python 对象,因此除非您拥有数百万个任务,否则它们不应该压倒 asyncio。您是否对该版本有特定问题,或者您是否试图通过不提前创建作业来优化内存使用? 不是数百万,而是数十万。我知道我可以使用 BoundedSemaphore 版本,但我不想提前安排所有任务,但只有在必要时才可以。使用数百兆字节的 RAM(任务 == 200 字节)来存储任务似乎很疯狂,我寻找了一个更优雅的解决方案。 很公平,我只是想检查一下“明显”的解决方案出了什么问题。 【参考方案1】:看来我自己找到了解决方案:
import asyncio
from typing import Callable
from random import randrange
from asyncio import Semaphore, ensure_future, get_event_loop
async def job(name, time_range=10):
timeout = randrange(time_range)
print(f"Task 'name' started with timeout timeout")
await asyncio.sleep(timeout)
print(f"Task 'name' finished")
return name
async def custom_executor(func: Callable, args: list, async_level: int = 4):
""" Asynchronously executes no more that 'async_level' callables specified by 'func' with corresponding 'args' """
loop = get_event_loop()
sync = Semaphore()
todo = set(args)
doing = set()
def _schedule_task():
if todo:
arg = todo.pop()
fr = func(*arg) if isinstance(arg, (tuple, list, set)) else func(arg)
f = ensure_future(fr, loop=loop)
f.add_done_callback(_on_completion)
doing.add(f)
def _on_completion(f):
doing.remove(f)
sync.release()
_schedule_task()
for _ in range(min(async_level, len(todo))):
_schedule_task()
while True:
if not doing:
break
await sync.acquire()
async def main():
await custom_executor(job, [(1, 3), 7, (8, 2), 12, 5])
if __name__ == '__main__':
asyncio.run(main())
但如果你知道更好的方法,请分享!
【讨论】:
【参考方案2】:您可以创建固定数量的工作人员并使用队列为他们分配任务。它有点短,我发现它比使用回调的代码更容易推理。但是YMMV。
async def custom_executor(func, args, async_level=4):
queue = asyncio.Queue(1)
async def worker():
while True:
arg = await queue.get()
fr = func(*arg) if isinstance(arg, (tuple, list, set)) else func(arg)
await fr
queue.task_done()
# create the workers
workers = [asyncio.create_task(worker()) for _ in range(async_level)]
# Feed the workers tasks. Since the queue is bounded, this will also
# wait for previous tasks to finish, similar to what you wanted to
# achieve with as_completed().
for x in args:
await queue.put(x)
await queue.join() # wait for the remaining tasks to finish
# cancel the now-idle workers
for w in workers:
w.cancel()
【讨论】:
以上是关于自定义异步执行器的主要内容,如果未能解决你的问题,请参考以下文章