Python async/await 下载 url 列表

Posted

技术标签:

【中文标题】Python async/await 下载 url 列表【英文标题】:Python async/await downloading a list of urls 【发布时间】:2018-01-14 22:32:52 【问题描述】:

我正在尝试从 FTP 服务器下载超过 30,000 个文件,经过一番谷歌搜索后,使用异步 IO 似乎是个好主意。但是,下面的代码无法下载任何文件并返回超时错误。我真的很感激任何帮助!谢谢!

class pdb:
    def __init__(self):
        self.ids = []
        self.dl_id = []
        self.err_id = []


    async def download_file(self, session, url):
        try:
            with async_timeout.timeout(10):
                async with session.get(url) as remotefile:
                    if remotefile.status == 200:
                        data = await remotefile.read()
                        return "error": "", "data": data
                    else:
                        return "error": remotefile.status, "data": ""
        except Exception as e:
            return "error": e, "data": ""

    async def unzip(self, session, work_queue):
        while not work_queue.empty():
            queue_url = await work_queue.get()
            print(queue_url)
            data = await self.download_file(session, queue_url)
            id = queue_url[-11:-7]
            ID = id.upper()
            if not data["error"]:
                saved_pdb = os.path.join("./pdb", ID, f'ID.pdb')
                if ID not in self.dl_id:
                    self.dl_id.append(ID)
                with open(f"id.ent.gz", 'wb') as f:
                    f.write(data["data"].read())
                with gzip.open(f"id.ent.gz", "rb") as inFile, open(saved_pdb, "wb") as outFile:
                    shutil.copyfileobj(inFile, outFile)
                os.remove(f"id.ent.gz")
            else:
                self.err_id.append(ID)

    def download_queue(self, urls):
        loop = asyncio.get_event_loop()
        q = asyncio.Queue(loop=loop)
        [q.put_nowait(url) for url in urls]
        con = aiohttp.TCPConnector(limit=10)
        with aiohttp.ClientSession(loop=loop, connector=con) as session:
            tasks = [asyncio.ensure_future(self.unzip(session, q)) for _ in range(len(urls))]
            loop.run_until_complete(asyncio.gather(*tasks))
        loop.close()

如果我删除了try 部分,则会出现错误消息:

回溯(最近一次通话最后一次): 文件“test.py”,第 111 行,在 x.download_queue(urls) 文件“test.py”,第 99 行,在 download_queue loop.run_until_complete(asyncio.gather(*tasks)) 文件“/home/yz/miniconda3/lib/python3.6/asyncio/base_events.py”,第 467 行,在 run_until_complete 返回 future.result() 文件“test.py”,第 73 行,解压缩 数据 = 等待 self.download_file(session, queue_url) 文件“test.py”,第 65 行,在 download_file 返回“错误”:remotefile.status,“数据”:“” 文件“/home/yz/miniconda3/lib/python3.6/site-packages/async_timeout/init.py”,第 46 行,exit 从无引发 asyncio.TimeoutError concurrent.futures._base.TimeoutError

【问题讨论】:

没有错误信息?你已经做过什么诊断了吗? 可能是因为您将超时设置为 10 秒:async_timeout.timeout(10) @KlausD。很抱歉没有包括那部分。刚刚编辑。 @Matej 是的,但是如果我删除该行,代码就会冻结,所以我想其他地方还是有问题。 您可能只是创建值 - 10 秒可能不足以下载文件 【参考方案1】:
tasks = [asyncio.ensure_future(self.unzip(session, q)) for _ in range(len(urls))]
loop.run_until_complete(asyncio.gather(*tasks))

在这里,您开始同时下载所有网址的过程。这意味着您也开始计算所有这些超时。一旦它是一个很大的数字,例如 30,000,由于网络/ram/cpu 容量,它无法在 10 秒内完成。

为避免这种情况,您应该保证同时启动的协程的限制。通常可以使用像asyncio.Semaphore 这样的同步原语来实现这一点。

看起来像这样:

sem = asyncio.Semaphore(10)

# ...

async def download_file(self, session, url):
    try:
        async with sem:  # Don't start next download until 10 other currently running
            with async_timeout.timeout(10):

【讨论】:

我发现了问题所在...我猜 aiohttp 无法处理 ftp url。不过感谢您的建议!【参考方案2】:

作为@MikhailGerasimov 的信号量方法的替代方案,您可以考虑使用aiostream.stream.map 运算符:

from aiostream import stream, pipe

async def main(urls):
    async with aiohttp.ClientSession() as session:
        ws = stream.repeat(session)
        xs = stream.zip(ws, stream.iterate(urls))
        ys = stream.starmap(xs, fetch, ordered=False, task_limit=10)
        zs = stream.map(ys, process)
        await zs

这是一个使用管道的等效实现:

async def main3(urls):
    async with aiohttp.ClientSession() as session:
        await (stream.repeat(session)
               | pipe.zip(stream.iterate(urls))
               | pipe.starmap(fetch, ordered=False, task_limit=10)
               | pipe.map(process))

您可以使用以下协程对其进行测试:

async def fetch(session, url):
    await asyncio.sleep(random.random())
    return url

async def process(data):
    print(data)

在 demonstration 和 documentation 中查看更多 aiostream 示例。

免责声明:我是项目维护者。

【讨论】:

流如何限制同时下载的文件数量? @wolfdawn 它只是限制了可以同时运行的fetch 任务的数量。达到限制后,map 流会等待任务完成,然后再向zip 流询问新参数。 谢谢!我错过了那部分。非常感谢。

以上是关于Python async/await 下载 url 列表的主要内容,如果未能解决你的问题,请参考以下文章

python 3.5 中的 async/await 关键字是不是受到 C# 中的 async/await 的启发? [关闭]

对python async与await的理解

异步方法 async/await

Python Async/Await入门指南

Python 3.5 async/await 与真实代码示例

JS async/await 的 CORS 错误,但当我将 URL 直接粘贴到浏览器中时没有 [重复]