如何在 Python 3.8 中为 asyncio.gather 构建任务列表
Posted
技术标签:
【中文标题】如何在 Python 3.8 中为 asyncio.gather 构建任务列表【英文标题】:How to build list of tasks for asyncio.gather in Python 3.8 【发布时间】:2020-07-21 23:34:41 【问题描述】:下面我附上了一个测试程序来演示我在 asyncio.gather 引发 TypeError 时遇到的问题。
我的目标: 进行多个并发异步调用,以从连接到我的计算机的 USB 摄像头阵列中将摄像头图像捕获到文件中。当所有相机都完成了它们的异步捕获后,我想恢复处理。
此处显示的异步协程 take_image() 对 "ffmpeg" 应用程序进行系统调用,该应用程序将图像从指定相机捕获到指定文件。 p>
import asyncio
import os
import subprocess
import time
async def take_image(camera_id, camera_name, image_file_path, image_counter):
image_capture_tic = time.perf_counter()
try:
run_cmd = subprocess.run( ["ffmpeg", '-y', '-hide_banner', '-f', 'avfoundation', '-i', camera_id,
'-frames:v', '1', '-f', 'image2', image_file_path], universal_newlines=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Note, ffmpeg writes to stderr, not stdout!
except Exception as e:
print("Error: Unable to capture image for", image_file_path)
return "NO IMAGE!"
image_capture_toc = time.perf_counter()
print(f"image_counter: Captured camera_name image in: image_capture_toc - image_capture_tic:0.0f seconds")
return camera_name
下面显示的 main() 例程获取多个摄像头的列表,并遍历列表中的每个摄像头,main() 使用 asyncio 为每个摄像头创建一个异步任务。创建任务()。每个任务都添加到任务列表中。
一旦所有图像捕获任务都已启动,我就使用 await asyncio.gather(tasks) 等待它们完成。
async def main():
tic = time.perf_counter()
camera_list = [('0', 'FHD Camera #1'), ('1', 'FHD Camera #2'), ('2', 'FHD Camera #3'), ]
image_counter = 1
tasks = []
for camera_pair in camera_list:
camera_id, camera_name = camera_pair
image_file_name = 'img' + str(image_counter) + "-cam" + str(camera_id) + "-" + camera_name + '.jpg'
image_file_path = os.path.join("/tmp/test1/img", image_file_name)
# schedule all image captures calls *concurrently*:
tasks.append(asyncio.create_task(take_image(camera_id, camera_name, image_file_path, image_counter),
name=image_file_name))
image_counter = image_counter + 1
await asyncio.gather(tasks) # <-- This line throws a TypeError!
toc = time.perf_counter()
print(f"Captured list of image_counter - 1 cameras in: toc - tic:0.0f seconds")
asyncio.run(main())
不幸的是,当我尝试运行这个程序时,我收到了这个错误:
TypeError: unhashable type: 'list'
以及以下 Traceback:
Traceback (most recent call last):
File "scratch_10.py", line 41, in <module>
asyncio.run(main())
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
return future.result()
File "scratch_10.py", line 36, in main
await asyncio.gather(tasks)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py", line 805, in gather
if arg not in arg_to_fut:
TypeError: unhashable type: 'list'
我一直在尝试通过 asyncio 的 3.8 文档来解惑,但我不明白出了什么问题。
如何让每个 take_image 请求异步运行,然后在每个任务完成后在我的调用例程中恢复处理?
【问题讨论】:
【参考方案1】:gather
采用位置参数,而不是单个可迭代参数。你需要打开你的清单。
await asyncio.gather(*tasks)
【讨论】:
这条线似乎有效。但是'asyncio.create_task()' 不是已经在事件循环中调度这些协程了吗?我希望,如果我得到一个任务/期货/协程列表,它们应该在我执行 'asyncio.gather(*task_list)' 时开始/被安排。 @zonk 我不确定我是否完全理解您的要求。是的create_task
将安排执行。但是,通常希望等到一组任务完成。等待gather
就是这样做的方法。
如果我创建一个任务并尝试再次收集它,会不会出现错误?示例:async def sleeper(): // await asyncio.sleep(1)
和现在主要的 g = asyncio.create_task(sleeper) // task = [g] // res = await asyncio.gather(*task) // print(res)
这不会导致错误吗?因为 g 已经安排好了?如果我想同时在gather
启动它们呢?【参考方案2】:
您应该尝试: (任务 1、任务 2、任务 3) 这就是我为工作所做的任务
【讨论】:
虽然在我的示例中我给出了 3 个摄像机的固定列表,但在我的生产系统中,我将有一个可变长度的摄像机列表,其名称和 ID 直到运行时才知道。如此硬编码似乎不切实际。以上是关于如何在 Python 3.8 中为 asyncio.gather 构建任务列表的主要内容,如果未能解决你的问题,请参考以下文章
如何在asyncio python中使用子进程模块限制并发进程数