在 asyncio 中批量处理任务

Posted

技术标签:

【中文标题】在 asyncio 中批量处理任务【英文标题】:Process tasks in batchs in asyncio 【发布时间】:2019-03-03 08:53:51 【问题描述】:

我有一个生成任务的函数(io 绑定任务):

def get_task():
    while True:
        new_task = _get_task()
        if new_task is not None:
            yield new_task
        else:
            sleep(1)

我正在尝试在 asyncio 中编写一个消费者,该消费者将同时处理最多 10 个任务,一个任务完成然后将执行新的任务。 我不确定我是否应该使用信号量或者是否有任何类型的 asycio 池执行器?我开始用线程编写伪代码:

def run(self)
   while True:
       self.semaphore.acquire() # first acquire, then get task
       t = get_task()
       self.process_task(t)

def process_task(self, task):
   try:
       self.execute_task(task)
       self.mark_as_done(task)
   except:
       self.mark_as_failed(task)
   self.semaphore.release()

谁能帮帮我?我不知道在哪里放置 async/await 关键字

【问题讨论】:

顺便说一句,一次最多运行10个任务的要求从何而来? 现有问题以及可以在此处应用的解决方案:***.com/a/48486557/705086 【参考方案1】:

使用 asyncio.Semaphore

的简单任务上限
async def max10(task_generator):
    semaphore = asyncio.Semaphore(10)

    async def bounded(task):
        async with semaphore:
            return await task

    async for task in task_generator:
        asyncio.ensure_future(bounded(task))

这个解决方案的问题在于,任务是从生成器中贪婪地抽取出来的。例如,如果生成器从大型数据库中读取,程序可能会耗尽内存。

除此之外,它是惯用的和乖巧的。

一种使用异步生成器协议按需拉取新任务的解决方案:

async def max10(task_generator):
    tasks = set()
    gen = task_generator.__aiter__()
    try:
        while True:
            while len(tasks) < 10:
                tasks.add(await gen.__anext__())
            _done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    except StopAsyncIteration:
        await asyncio.gather(*tasks)

它可能被认为是次优的,因为它直到 10 个可用任务才开始执行。

这是使用 worker 模式的简洁而神奇的解决方案:

async def max10(task_generator):
    async def worker():
        async for task in task_generator:
            await task

    await asyncio.gather(*[worker() for i in range(10)])

它依赖于一个有点违反直觉的属性,即能够在同一个异步生成器上拥有多个异步迭代器,在这种情况下,每个生成的项目只能被一个迭代器看到。

我的直觉告诉我,这些解决方案都不能在 cancellation 上正常运行。

【讨论】:

我喜欢这个答案的教学方法,但最后一个 sn-p 可能要简单得多。由于您有固定数量的工人,您可以摆脱信号量。如果没有信号量,worker 可以使用普通的async for 循环。 谢谢,已编辑。还添加了解释为什么它甚至可以工作:) 请注意ensure_future was soft-deprecated in 3.7 支持更友好的asyncio.create_task【参考方案2】:

异步不是线程。例如,如果您有文件 IO 绑定的任务,则 write them async using aiofiles

async with aiofiles.open('filename', mode='r') as f:
    contents = await f.read()

然后用你的任务替换任务。如果您想一次只运行 10 个,请等待 asyncio.gather 每 10 个任务。

import asyncio

async def task(x):
  await asyncio.sleep(0.5)
  print( x, "is done" )

async def run(loop):
  futs = []
  for x in range(50):
    futs.append( task(x) )

  await asyncio.gather( *futs )

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()

如果您不能异步编写任务并且需要线程,这是使用 asyncio 的 ThreadPoolExecutor 的基本示例。请注意,当 max_workers=5 时,一次只能运行 5 个任务。

import time
from concurrent.futures import ThreadPoolExecutor
import asyncio

def blocking(x):
  time.sleep(1)
  print( x, "is done" )

async def run(loop):
  futs = []
  executor = ThreadPoolExecutor(max_workers=5)
  for x in range(15):
    future = loop.run_in_executor(executor, blocking, x)
    futs.append( future )

  await asyncio.sleep(4)
  res = await asyncio.gather( *futs )

loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()

【讨论】:

感谢您的回复。问题是我不想做 10 到 10,而是在任何一项任务完成后开始下一个任务。所以一开始我开始了 10 个任务,其中一个完成了,然后我将下一个添加到池中。这样一直有10个任务在处理 但您的解决方案中的问题是您在尚未获得信号量时获取新任务,在我的情况下是什么导致问题【参考方案3】:

正如Dima Tismek 所指出的,使用信号量来限制并发很容易耗尽task_generator,因为在获取任务和将它们提交到事件循环之间没有背压。另一个答案也探讨了一个更好的选择,不是在生成器生成项目后立即生成任务,而是创建固定数量的工人同时耗尽生成器。

代码有两个地方可以改进:

不需要信号量 - 当任务数量一开始就固定时,它是多余的; 处理生成任务和限制任务的取消。

这是一个解决这两个问题的实现:

async def throttle(task_generator, max_tasks):
    it = task_generator.__aiter__()
    cancelled = False
    async def worker():
        async for task in it:
            try:
                await task
            except asyncio.CancelledError:
                # If a generated task is canceled, let its worker
                # proceed with other tasks - except if it's the
                # outer coroutine that is cancelling us.
                if cancelled:
                    raise
            # other exceptions are propagated to the caller
    worker_tasks = [asyncio.create_task(worker())
                    for i in range(max_tasks)]
    try:
        await asyncio.gather(*worker_tasks)
    except:
        # In case of exception in one worker, or in case we're
        # being cancelled, cancel all workers and propagate the
        # exception.
        cancelled = True
        for t in worker_tasks:
            t.cancel()
        raise

一个简单的测试用例:

async def mock_task(num):
    print('running', num)
    await asyncio.sleep(random.uniform(1, 5))
    print('done', num)

async def mock_gen():
    tnum = 0
    while True:
        await asyncio.sleep(.1 * random.random())
        print('generating', tnum)
        yield asyncio.create_task(mock_task(tnum))
        tnum += 1

if __name__ == '__main__':
    asyncio.run(throttle(mock_gen(), 3))

【讨论】:

以上是关于在 asyncio 中批量处理任务的主要内容,如果未能解决你的问题,请参考以下文章

高效跑批设计思路——针对系统中的批量日终任务

批量任务调优

华为机试真题 C++ 实现批量处理任务

华为机试真题 C++ 实现批量处理任务

GCP Pub Sub:批量处理消息

利用python处理自动化任务之同时批量修改word里面的内容