使用 Python 并行下载多个 GCS 文件(到内存中)

Posted

技术标签:

【中文标题】使用 Python 并行下载多个 GCS 文件(到内存中)【英文标题】:Download multiple GCS files in parallel (into memory) using Python 【发布时间】:2021-12-16 16:58:21 【问题描述】:

我有一个存储桶,里面有很多大文件(每个 500mb)。有时我需要加载多个文件,按名称引用。我一直在使用blob.download_as_string()函数逐个下载文件,但是速度非常慢,所以我想尝试并行下载。

我找到了gcloud-aio-storage 包,但是文档有点稀疏,尤其是download 函数。

我更愿意将文件下载/存储在内存中,而不是下载到本地机器然后上传到脚本。

这是我拼凑起来的,虽然我似乎无法让它发挥作用。我不断收到超时错误。 我做错了什么?

注意:使用 python 3.7,以及所有其他软件包中的最新版本。

test_download.py


from gcloud.aio.storage import Storage
import aiohttp 
import asyncio

async def gcs_download(session, bucket_name, file, storage):
    async with session: 
        bucket = storage.get_bucket(bucket_name)
        blob = await bucket.get_blob(file)
        return  await blob.download()
    

async def get_gcsfiles_async(bucket_name, gcs_files):

    async with aiohttp.ClientSession() as session:
        storage = Storage(session=session)
        coros = (gcs_download(session, bucket_name, file, storage) for file in gcs_files)
        return await asyncio.gather(*coros)
        

那么我调用/传入值的方式如下:

import test_download as test
import asyncio

bucket_name = 'my_bucket_name'
project_name = 'my_project_name'  ### Where do I reference this???

gcs_files = ['bucket_folder/some-file-2020-10-06.txt', 
            'bucket_folder/some-file-2020-10-07.txt',
            'bucket_folder/some-file-2020-10-08.txt']

result = asyncio.run(test.get_gcsfiles_async(bucket_name, gcs_files))

我们将不胜感激!

这里是相关问题,虽然有两点需要注意:Google Storage python api download in parallel

    当我从批准的答案运行代码时,它最终会卡住并且永远不会下载 这是在 gcloud-aio-storage 包发布之前,可能没有利用“最佳”当前方法。

【问题讨论】:

【参考方案1】:

似乎缺少该库的文档,但我可以运行一些东西,并且它正在我的测试中工作。通过查看代码,我发现你不需要使用blob.download(),因为无论如何it calls storage.download()。我将下面的脚本基于usage 部分,该部分处理上传,但可以重写以供下载。 storage.download() 不会写入文件,因为这是由 storage.download_to_filename() 完成的。您可以查看可用的下载方式here。

async_download.py

import asyncio
from gcloud.aio.auth import Token
from gcloud.aio.storage import Storage

# Used a token from a service account for authentication
sa_token = Token(service_file="../resources/gcs-test-service-account.json", scopes=["https://www.googleapis.com/auth/devstorage.full_control"])

async def async_download(bucket, obj_names):
    async with Storage(token=sa_token) as client:
        tasks = (client.download(bucket, file) for file in obj_names) # Used the built in download method, with required args
        res = await asyncio.gather(*tasks)

    await sa_token.close()
    return res

ma​​in.py

import async_download as dl_test
import asyncio

bucket_name = "my-bucket-name"
obj_names = [
    "text1.txt",
    "text2.txt",
    "text3.txt"
]

res = asyncio.run(dl_test.async_download(bucket_name, obj_names))

print(res)

如果您想改用服务帐户Token,可以关注this guide 并使用相关的auth scopes。由于服务帐户是针对项目的,因此不需要指定项目,但我也没有看到任何 Session 的项目名称引用。虽然 GCS 的 GCP Python 库还不支持并行下载,但有一个 feature request 对此开放。目前还没有发布此版本的 ETA。

【讨论】:

谢谢埃内斯托!我稍后会尝试一下。感谢您提供服务帐户的代码,该代码将在部署时使用。 我仍然收到concurrent.futures._base.TimeoutError。我不能在我的本地机器上使用服务来测试它,所以这可能就是原因。但我还需要访问存储桶内文件夹中的文件。所以obj_names = [folder/text1.txt, folder/text2.txt],也不确定这是否是个问题。 @mrp 我可以确认在文件夹中包含对象并将它们引用为folder/file.txt 仍然适用于我的测试。您的问题可能来自未使用服务帐户,因为 example 使用服务帐户。您还可以简化代码并允许库自动managesession。我在代码中添加了一个缺失的 Token.close() 语句。让我知道这是否有助于参考未来的用户。

以上是关于使用 Python 并行下载多个 GCS 文件(到内存中)的主要内容,如果未能解决你的问题,请参考以下文章

apache-beam 从 GCS 存储桶的多个文件夹中读取多个文件并加载它 bigquery python

使用Python SDK时如何防止GCS自动解压对象?

并行下载多个文件的库或工具[关闭]

使用 python 3.6 将多个文件并行加载到内存中的最佳方法是啥?

Apache Beam 数据流:从 Azure 到 GCS 的文件传输

将 GCS 文件加载到 BigQuery 的 Cloud Functions 的 Python 单元测试