使用 Python 并行下载多个 GCS 文件(到内存中)
Posted
技术标签:
【中文标题】使用 Python 并行下载多个 GCS 文件(到内存中)【英文标题】:Download multiple GCS files in parallel (into memory) using Python 【发布时间】:2021-12-16 16:58:21 【问题描述】:我有一个存储桶,里面有很多大文件(每个 500mb)。有时我需要加载多个文件,按名称引用。我一直在使用blob.download_as_string()
函数逐个下载文件,但是速度非常慢,所以我想尝试并行下载。
我找到了gcloud-aio-storage
包,但是文档有点稀疏,尤其是download
函数。
我更愿意将文件下载/存储在内存中,而不是下载到本地机器然后上传到脚本。
这是我拼凑起来的,虽然我似乎无法让它发挥作用。我不断收到超时错误。 我做错了什么?
注意:使用 python 3.7,以及所有其他软件包中的最新版本。
test_download.py
from gcloud.aio.storage import Storage
import aiohttp
import asyncio
async def gcs_download(session, bucket_name, file, storage):
async with session:
bucket = storage.get_bucket(bucket_name)
blob = await bucket.get_blob(file)
return await blob.download()
async def get_gcsfiles_async(bucket_name, gcs_files):
async with aiohttp.ClientSession() as session:
storage = Storage(session=session)
coros = (gcs_download(session, bucket_name, file, storage) for file in gcs_files)
return await asyncio.gather(*coros)
那么我调用/传入值的方式如下:
import test_download as test
import asyncio
bucket_name = 'my_bucket_name'
project_name = 'my_project_name' ### Where do I reference this???
gcs_files = ['bucket_folder/some-file-2020-10-06.txt',
'bucket_folder/some-file-2020-10-07.txt',
'bucket_folder/some-file-2020-10-08.txt']
result = asyncio.run(test.get_gcsfiles_async(bucket_name, gcs_files))
我们将不胜感激!
这里是相关问题,虽然有两点需要注意:Google Storage python api download in parallel
-
当我从批准的答案运行代码时,它最终会卡住并且永远不会下载
这是在
gcloud-aio-storage
包发布之前,可能没有利用“最佳”当前方法。
【问题讨论】:
【参考方案1】:似乎缺少该库的文档,但我可以运行一些东西,并且它正在我的测试中工作。通过查看代码,我发现你不需要使用blob.download()
,因为无论如何it calls storage.download()
。我将下面的脚本基于usage 部分,该部分处理上传,但可以重写以供下载。 storage.download()
不会写入文件,因为这是由 storage.download_to_filename()
完成的。您可以查看可用的下载方式here。
async_download.py
import asyncio
from gcloud.aio.auth import Token
from gcloud.aio.storage import Storage
# Used a token from a service account for authentication
sa_token = Token(service_file="../resources/gcs-test-service-account.json", scopes=["https://www.googleapis.com/auth/devstorage.full_control"])
async def async_download(bucket, obj_names):
async with Storage(token=sa_token) as client:
tasks = (client.download(bucket, file) for file in obj_names) # Used the built in download method, with required args
res = await asyncio.gather(*tasks)
await sa_token.close()
return res
main.py
import async_download as dl_test
import asyncio
bucket_name = "my-bucket-name"
obj_names = [
"text1.txt",
"text2.txt",
"text3.txt"
]
res = asyncio.run(dl_test.async_download(bucket_name, obj_names))
print(res)
如果您想改用服务帐户Token
,可以关注this guide 并使用相关的auth scopes。由于服务帐户是针对项目的,因此不需要指定项目,但我也没有看到任何 Session
的项目名称引用。虽然 GCS 的 GCP Python 库还不支持并行下载,但有一个 feature request 对此开放。目前还没有发布此版本的 ETA。
【讨论】:
谢谢埃内斯托!我稍后会尝试一下。感谢您提供服务帐户的代码,该代码将在部署时使用。 我仍然收到concurrent.futures._base.TimeoutError
。我不能在我的本地机器上使用服务来测试它,所以这可能就是原因。但我还需要访问存储桶内文件夹中的文件。所以obj_names = [folder/text1.txt, folder/text2.txt]
,也不确定这是否是个问题。
@mrp 我可以确认在文件夹中包含对象并将它们引用为folder/file.txt
仍然适用于我的测试。您的问题可能来自未使用服务帐户,因为 example 使用服务帐户。您还可以简化代码并允许库自动managesession
。我在代码中添加了一个缺失的 Token.close()
语句。让我知道这是否有助于参考未来的用户。以上是关于使用 Python 并行下载多个 GCS 文件(到内存中)的主要内容,如果未能解决你的问题,请参考以下文章
apache-beam 从 GCS 存储桶的多个文件夹中读取多个文件并加载它 bigquery python
使用 python 3.6 将多个文件并行加载到内存中的最佳方法是啥?