当我清楚地表明我只想完成第一个任务时,为什么所有任务都在asyncio.wait()中完成?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了当我清楚地表明我只想完成第一个任务时,为什么所有任务都在asyncio.wait()中完成?相关的知识,希望对你有一定的参考价值。

我正在尝试使用asyncio从OpenSubtitles中获取一些数据,然后下载该数据中包含信息的文件。我想使用asyncio同时获取该数据并下载该文件。

问题是我想等待列表tasks中的1个任务完成,然后开始列表中的其余任务或download_tasks。原因是在self._perform_query()我写信息到文件和self._download_and_save_file()我正在从该文件中读取相同的信息。换句话说,download_tasks需要等待tasks中的至少一项任务在开始之前完成。

我发现我可以用asyncio.wait(return_when=FIRST_COMPLETED)做到这一点,但由于某种原因它不能正常工作:

payloads = [create_payloads(entry) for entry in retreive(table_in_database)] 
tasks = [asyncio.create_task(self._perform_query(payload, proxy)) for payload in payloads]
download_tasks = [asyncio.create_task(self._download_and_save_file(url, proxy) for url in url_list]

done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(done)
print(len(done))
print(pending)
print(len(pending))
await asyncio.wait(download_tasks)

输出与预期完全不同。看来tasks列表中的3个任务中有3个正在完成,尽管我通过了asyncio.FIRST_COMPLETED。为什么会这样?

{<Task finished coro=<SubtitleDownloader._perform_query() done, defined at C:\Users\...\subtitles.py:71> result=None>, <Task finished coro=<SubtitleDownloader._perform_query() done, defined at C:\Users\...\subtitles.py:71> result=None>, <Task finished coro=<SubtitleDownloader._perform_query() done, defined at C:\Users\...\subtitles.py:71> result=None>}
3
set()
0
Exiting

据我所知,self._perform_query()中的代码不应该影响这个问题。无论如何,这只是为了确保:

async def _perform_query(self, payload, proxy):
    try:
        query_result = proxy.SearchSubtitles(self.opensubs_token, [payload], {"limit": 25})
    except Fault as e:
        raise "A fault has occurred:\n{}".format(e)
    except ProtocolError as e:
        raise "A ProtocolError has occurred:\n{}".format(e)
    else:
        if query_result["status"] == "200 OK":
            with open("dl_links.json", "w") as dl_links_json:
                result = query_result["data"][0]
                subtitle_name = result["SubFileName"]
                download_link = result["SubDownloadLink"]
                download_data = {"download link": download_link,
                                 "file name": subtitle_name}
                json.dump(download_data, dl_links_json)
        else:
            print("Wrong status code: {}".format(query_result["status"]))

现在,我一直在测试这个没有运行download_tasks,但我已经将它包含在这里用于上下文。也许我会以完全错误的方式解决这个问题。如果是这样,我将非常感谢您的投入!

编辑:

问题非常简单,如下所述。 _perform_query不是一个等待的功能,而是同步运行。我通过编辑_perform_query的文件写入与aiofiles异步来改变它:

def _perform_query(self, payload, proxy):
    query_result = proxy.SearchSubtitles(self.opensubs_token, [payload], {"limit": 25})
    if query_result["status"] == "200 OK":
        async with aiofiles.open("dl_links.json", mode="w") as dl_links_json:
            result = query_result["data"][0]
            download_link = result["SubDownloadLink"]
            await dl_links_json.write(download_link)
答案

return_when=FIRST_COMPLETED不保证只完成一项任务。它保证一旦任务完成,等待就会完成,但完全可能“同时”完成其他任务,这对于asyncio意味着在事件循环的同一次迭代中。例如,考虑以下代码:

async def noop():
    pass

async def main():
    done, pending = await asyncio.wait(
        [noop(), noop(), noop()], return_when=asyncio.FIRST_COMPLETED)
    print(len(done), len(pending))

asyncio.run(main())

这会打印3 0,就像你的代码一样。为什么?

asyncio.wait做了两件事:它将协同程序提交给事件循环,并设置回调以在任何一个完成时通知它。然而,noop协程不包含await,所以没有任何对noop()的调用暂停,每个只是做它的东西并立即返回。结果,所有三个协程实例都在事件循环的同一遍中完成。然后wait被告知所有三个协同程序已经完成,这是一个尽职尽责的报道。

如果你改变noop等待随机睡眠,例如将pass更改为await asyncio.sleep(0.1 * random.random()),即可获得预期的行为。使用await协同程序不再同时完成,wait会在检测到第一个时报告。

这揭示了您的代码的真正潜在问题:_perform_query没有等待。这表示您没有使用异步基础库,或者您使用不正确。对SearchSubtitles的调用可能只是阻止了事件循环,这似乎在琐碎的测试中起作用,但是打破了基本的asyncio功能,例如并发执行任务。

以上是关于当我清楚地表明我只想完成第一个任务时,为什么所有任务都在asyncio.wait()中完成?的主要内容,如果未能解决你的问题,请参考以下文章

当我只想在 JPG 和 PNG 之间转换时,为啥 ImageMagick 会改变图像亮度?

如果文件不匹配,则递归地将一个目录复制到另一个目录

我只想在我的表的电子邮件列更新时制作一个表审计/影子表

当我只想在挂载时运行一次时,useEffect 缺少依赖项

赏月斋源码共享计划 第二期

获取 403(禁止)React + Django