如何在 Python 3.8 中为 asyncio.gather 构建任务列表

Posted

技术标签:

【中文标题】如何在 Python 3.8 中为 asyncio.gather 构建任务列表【英文标题】:How to build list of tasks for asyncio.gather in Python 3.8 【发布时间】:2020-07-21 23:34:41 【问题描述】:

下面我附上了一个测试程序来演示我在 asyncio.gather 引发 TypeError 时遇到的问题。

我的目标: 进行多个并发异步调用,以从连接到我的计算机的 USB 摄像头阵列中将摄像头图像捕获到文件中。当所有相机都完成了它们的异步捕获后,我想恢复处理。

此处显示的异步协程 take_image()"ffmpeg" 应用程序进行系统调用,该应用程序将图像从指定相机捕获到指定文件。 p>

import asyncio
import os
import subprocess
import time

async def take_image(camera_id, camera_name, image_file_path, image_counter):
    image_capture_tic = time.perf_counter()
    try:
        run_cmd = subprocess.run( ["ffmpeg", '-y', '-hide_banner', '-f', 'avfoundation', '-i', camera_id,
                                   '-frames:v', '1', '-f', 'image2', image_file_path], universal_newlines=True,
                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE)  # Note, ffmpeg writes to stderr, not stdout!
    except Exception as e:
        print("Error: Unable to capture image for", image_file_path)
        return "NO IMAGE!"

    image_capture_toc = time.perf_counter()
    print(f"image_counter: Captured camera_name image in: image_capture_toc - image_capture_tic:0.0f seconds")
    return camera_name

下面显示的 ma​​in() 例程获取多个摄像头的列表,并遍历列表中的每个摄像头,main() 使用 asyncio 为每个摄像头创建一个异步任务。创建任务()。每个任务都添加到任务列表中。

一旦所有图像捕获任务都已启动,我就使用 await asyncio.gather(tasks) 等待它们完成。

async def main():
    tic = time.perf_counter()
    camera_list = [('0', 'FHD Camera #1'),  ('1', 'FHD Camera #2'), ('2', 'FHD Camera #3'), ]
    image_counter = 1
    tasks = []
    for camera_pair in camera_list:
        camera_id, camera_name = camera_pair
        image_file_name = 'img' + str(image_counter) + "-cam" + str(camera_id)  + "-" + camera_name + '.jpg'
        image_file_path = os.path.join("/tmp/test1/img", image_file_name)

        # schedule all image captures calls *concurrently*:
        tasks.append(asyncio.create_task(take_image(camera_id, camera_name, image_file_path, image_counter),
                     name=image_file_name))
        image_counter = image_counter + 1

    await asyncio.gather(tasks) # <-- This line throws a TypeError!
    toc = time.perf_counter()
    print(f"Captured list of image_counter - 1 cameras in: toc - tic:0.0f seconds")

asyncio.run(main())

不幸的是,当我尝试运行这个程序时,我收到了这个错误:

TypeError: unhashable type: 'list'

以及以下 Traceback:

Traceback (most recent call last):
  File "scratch_10.py", line 41, in <module>
    asyncio.run(main())
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
  File "scratch_10.py", line 36, in main
    await asyncio.gather(tasks)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py", line 805, in gather
    if arg not in arg_to_fut:
TypeError: unhashable type: 'list'

我一直在尝试通过 asyncio 的 3.8 文档来解惑,但我不明白出了什么问题。

如何让每个 take_image 请求异步运行,然后在每个任务完成后在我的调用例程中恢复处理?

【问题讨论】:

【参考方案1】:

gather 采用位置参数,而不是单个可迭代参数。你需要打开你的清单。

await asyncio.gather(*tasks)

【讨论】:

这条线似乎有效。但是'asyncio.create_task()' 不是已经在事件循环中调度这些协程了吗?我希望,如果我得到一个任务/期货/协程列表,它们应该在我执行 'asyncio.gather(*task_list)' 时开始/被安排。 @zonk 我不确定我是否完全理解您的要求。是的create_task 将安排执行。但是,通常希望等到一组任务完成。等待gather 就是这样做的方法。 如果我创建一个任务并尝试再次收集它,会不会出现错误?示例:async def sleeper(): // await asyncio.sleep(1) 和现在主要的 g = asyncio.create_task(sleeper) // task = [g] // res = await asyncio.gather(*task) // print(res) 这不会导致错误吗?因为 g 已经安排好了?如果我想同时在gather启动它们呢?【参考方案2】:

您应该尝试: (任务 1、任务 2、任务 3) 这就是我为工作所做的任务

【讨论】:

虽然在我的示例中我给出了 3 个摄像机的固定列表,但在我的生产系统中,我将有一个可变长度的摄像机列表,其名称和 ID 直到运行时才知道。如此硬编码似乎不切实际。

以上是关于如何在 Python 3.8 中为 asyncio.gather 构建任务列表的主要内容,如果未能解决你的问题,请参考以下文章

Python中的协程与asyncio原理

Python中的协程与asyncio原理

Python中的协程与asyncio原理

如何在asyncio python中使用子进程模块限制并发进程数

如何在 Rails 2.3.8 中为 https 请求(即安全连接)查找客户端 IP

如何在 python asyncio 中等待 select.select 调用