在 asyncio 中批量处理任务
Posted
技术标签:
【中文标题】在 asyncio 中批量处理任务【英文标题】:Process tasks in batchs in asyncio 【发布时间】:2019-03-03 08:53:51 【问题描述】:我有一个生成任务的函数(io 绑定任务):
def get_task():
while True:
new_task = _get_task()
if new_task is not None:
yield new_task
else:
sleep(1)
我正在尝试在 asyncio 中编写一个消费者,该消费者将同时处理最多 10 个任务,一个任务完成然后将执行新的任务。 我不确定我是否应该使用信号量或者是否有任何类型的 asycio 池执行器?我开始用线程编写伪代码:
def run(self)
while True:
self.semaphore.acquire() # first acquire, then get task
t = get_task()
self.process_task(t)
def process_task(self, task):
try:
self.execute_task(task)
self.mark_as_done(task)
except:
self.mark_as_failed(task)
self.semaphore.release()
谁能帮帮我?我不知道在哪里放置 async/await 关键字
【问题讨论】:
顺便说一句,一次最多运行10个任务的要求从何而来? 现有问题以及可以在此处应用的解决方案:***.com/a/48486557/705086 【参考方案1】:使用 asyncio.Semaphore
的简单任务上限async def max10(task_generator):
semaphore = asyncio.Semaphore(10)
async def bounded(task):
async with semaphore:
return await task
async for task in task_generator:
asyncio.ensure_future(bounded(task))
这个解决方案的问题在于,任务是从生成器中贪婪地抽取出来的。例如,如果生成器从大型数据库中读取,程序可能会耗尽内存。
除此之外,它是惯用的和乖巧的。
一种使用异步生成器协议按需拉取新任务的解决方案:
async def max10(task_generator):
tasks = set()
gen = task_generator.__aiter__()
try:
while True:
while len(tasks) < 10:
tasks.add(await gen.__anext__())
_done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
except StopAsyncIteration:
await asyncio.gather(*tasks)
它可能被认为是次优的,因为它直到 10 个可用任务才开始执行。
这是使用 worker 模式的简洁而神奇的解决方案:
async def max10(task_generator):
async def worker():
async for task in task_generator:
await task
await asyncio.gather(*[worker() for i in range(10)])
它依赖于一个有点违反直觉的属性,即能够在同一个异步生成器上拥有多个异步迭代器,在这种情况下,每个生成的项目只能被一个迭代器看到。
我的直觉告诉我,这些解决方案都不能在 cancellation 上正常运行。
【讨论】:
我喜欢这个答案的教学方法,但最后一个 sn-p 可能要简单得多。由于您有固定数量的工人,您可以摆脱信号量。如果没有信号量,worker 可以使用普通的async for
循环。
谢谢,已编辑。还添加了解释为什么它甚至可以工作:)
请注意ensure_future
was soft-deprecated in 3.7 支持更友好的asyncio.create_task
。【参考方案2】:
异步不是线程。例如,如果您有文件 IO 绑定的任务,则 write them async using aiofiles
async with aiofiles.open('filename', mode='r') as f:
contents = await f.read()
然后用你的任务替换任务。如果您想一次只运行 10 个,请等待 asyncio.gather 每 10 个任务。
import asyncio
async def task(x):
await asyncio.sleep(0.5)
print( x, "is done" )
async def run(loop):
futs = []
for x in range(50):
futs.append( task(x) )
await asyncio.gather( *futs )
loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()
如果您不能异步编写任务并且需要线程,这是使用 asyncio 的 ThreadPoolExecutor 的基本示例。请注意,当 max_workers=5 时,一次只能运行 5 个任务。
import time
from concurrent.futures import ThreadPoolExecutor
import asyncio
def blocking(x):
time.sleep(1)
print( x, "is done" )
async def run(loop):
futs = []
executor = ThreadPoolExecutor(max_workers=5)
for x in range(15):
future = loop.run_in_executor(executor, blocking, x)
futs.append( future )
await asyncio.sleep(4)
res = await asyncio.gather( *futs )
loop = asyncio.get_event_loop()
loop.run_until_complete( run(loop) )
loop.close()
【讨论】:
感谢您的回复。问题是我不想做 10 到 10,而是在任何一项任务完成后开始下一个任务。所以一开始我开始了 10 个任务,其中一个完成了,然后我将下一个添加到池中。这样一直有10个任务在处理 但您的解决方案中的问题是您在尚未获得信号量时获取新任务,在我的情况下是什么导致问题【参考方案3】:正如Dima Tismek 所指出的,使用信号量来限制并发很容易耗尽task_generator
,因为在获取任务和将它们提交到事件循环之间没有背压。另一个答案也探讨了一个更好的选择,不是在生成器生成项目后立即生成任务,而是创建固定数量的工人同时耗尽生成器。
代码有两个地方可以改进:
不需要信号量 - 当任务数量一开始就固定时,它是多余的; 处理生成任务和限制任务的取消。这是一个解决这两个问题的实现:
async def throttle(task_generator, max_tasks):
it = task_generator.__aiter__()
cancelled = False
async def worker():
async for task in it:
try:
await task
except asyncio.CancelledError:
# If a generated task is canceled, let its worker
# proceed with other tasks - except if it's the
# outer coroutine that is cancelling us.
if cancelled:
raise
# other exceptions are propagated to the caller
worker_tasks = [asyncio.create_task(worker())
for i in range(max_tasks)]
try:
await asyncio.gather(*worker_tasks)
except:
# In case of exception in one worker, or in case we're
# being cancelled, cancel all workers and propagate the
# exception.
cancelled = True
for t in worker_tasks:
t.cancel()
raise
一个简单的测试用例:
async def mock_task(num):
print('running', num)
await asyncio.sleep(random.uniform(1, 5))
print('done', num)
async def mock_gen():
tnum = 0
while True:
await asyncio.sleep(.1 * random.random())
print('generating', tnum)
yield asyncio.create_task(mock_task(tnum))
tnum += 1
if __name__ == '__main__':
asyncio.run(throttle(mock_gen(), 3))
【讨论】:
以上是关于在 asyncio 中批量处理任务的主要内容,如果未能解决你的问题,请参考以下文章