Boto3 从 S3 存储桶下载所有文件

Posted

技术标签:

【中文标题】Boto3 从 S3 存储桶下载所有文件【英文标题】:Boto3 to download all files from a S3 Bucket 【发布时间】:2015-11-02 07:58:24 【问题描述】:

我正在使用 boto3 从 s3 存储桶中获取文件。我需要类似aws s3 sync的功能

我当前的代码是

#!/usr/bin/python
import boto3
s3=boto3.client('s3')
list=s3.list_objects(Bucket='my_bucket_name')['Contents']
for key in list:
    s3.download_file('my_bucket_name', key['Key'], key['Key'])

只要存储桶只有文件,这就可以正常工作。 如果存储桶中存在文件夹,则会引发错误

Traceback (most recent call last):
  File "./test", line 6, in <module>
    s3.download_file('my_bucket_name', key['Key'], key['Key'])
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/inject.py", line 58, in download_file
    extra_args=ExtraArgs, callback=Callback)
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 651, in download_file
    extra_args, callback)
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 666, in _download_file
    self._get_object(bucket, key, filename, extra_args, callback)
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 690, in _get_object
    extra_args, callback)
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 707, in _do_get_object
    with self._osutil.open(filename, 'wb') as f:
  File "/usr/local/lib/python2.7/dist-packages/boto3/s3/transfer.py", line 323, in open
    return open(filename, mode)
IOError: [Errno 2] No such file or directory: 'my_folder/.8Df54234'

这是使用 boto3 下载完整 s3 存储桶的正确方法吗?如何下载文件夹。

【问题讨论】:

见***.com/a/54622791/3140992 【参考方案1】:

我有同样的需求,创建了以下递归下载文件的函数。

仅当目录包含文件时才会在本地创建。

import boto3
import os

def download_dir(client, resource, dist, local='/tmp', bucket='your_bucket'):
    paginator = client.get_paginator('list_objects')
    for result in paginator.paginate(Bucket=bucket, Delimiter='/', Prefix=dist):
        if result.get('CommonPrefixes') is not None:
            for subdir in result.get('CommonPrefixes'):
                download_dir(client, resource, subdir.get('Prefix'), local, bucket)
        for file in result.get('Contents', []):
            dest_pathname = os.path.join(local, file.get('Key'))
            if not os.path.exists(os.path.dirname(dest_pathname)):
                os.makedirs(os.path.dirname(dest_pathname))
            if not file.get('Key').endswith('/'):
                resource.meta.client.download_file(bucket, file.get('Key'), dest_pathname)

函数是这样调用的:

def _start():
    client = boto3.client('s3')
    resource = boto3.resource('s3')
    download_dir(client, resource, 'clientconf/', '/tmp', bucket='my-bucket')

【讨论】:

我认为您不需要创建资源和客户端。我相信客户总是在资源上可用。你可以使用resource.meta.client 我认为应该是“download_dir(client, resource, subdir.get('Prefix'), local, bucket)” 我收到了一个OSError: [Errno 21] Is a directory,所以我用if not file.get('Key').endswith('/') 包装了对download_file 的调用来解决。谢谢@glefait 和@Shan boto3 库中没有等效的 aws-cli 命令aws s3 sync 吗? 这里的dist 是什么?【参考方案2】:

使用包含 1000 多个对象的存储桶时,有必要实现一个解决方案,该解决方案在最多 1000 个键的顺序集上使用 NextContinuationToken。该解决方案首先编译对象列表,然后迭代创建指定目录并下载现有对象。

import boto3
import os

s3_client = boto3.client('s3')

def download_dir(prefix, local, bucket, client=s3_client):
    """
    params:
    - prefix: pattern to match in s3
    - local: local path to folder in which to place files
    - bucket: s3 bucket with target contents
    - client: initialized s3 client object
    """
    keys = []
    dirs = []
    next_token = ''
    base_kwargs = 
        'Bucket':bucket,
        'Prefix':prefix,
    
    while next_token is not None:
        kwargs = base_kwargs.copy()
        if next_token != '':
            kwargs.update('ContinuationToken': next_token)
        results = client.list_objects_v2(**kwargs)
        contents = results.get('Contents')
        for i in contents:
            k = i.get('Key')
            if k[-1] != '/':
                keys.append(k)
            else:
                dirs.append(k)
        next_token = results.get('NextContinuationToken')
    for d in dirs:
        dest_pathname = os.path.join(local, d)
        if not os.path.exists(os.path.dirname(dest_pathname)):
            os.makedirs(os.path.dirname(dest_pathname))
    for k in keys:
        dest_pathname = os.path.join(local, k)
        if not os.path.exists(os.path.dirname(dest_pathname)):
            os.makedirs(os.path.dirname(dest_pathname))
        client.download_file(bucket, k, dest_pathname)

【讨论】:

将其更改为可接受的答案,因为它可以处理更广泛的用例。谢谢格兰特 我的代码在while next_token is not None:进入无限循环 @gpd 这不应该发生,因为 boto3 客户端将在到达最后一页时返回一个没有 NextContinuationToken 的页面,退出 while 语句。如果您粘贴使用 boto3 API 获得的最后一个响应(无论存储在响应变量中),那么我认为在您的特定情况下会更清楚发生了什么。尝试打印出“结果”变量来进行测试。我的猜测是您给出的前缀对象与您的存储桶中的任何内容都不匹配。你检查了吗? 请注意,您需要进行一些小的更改才能使其与 Digital Ocean 一起使用。如解释here 使用此代码我收到此错误:'NoneType' 对象不可迭代:TypeError【参考方案3】:
import os
import boto3

#initiate s3 resource
s3 = boto3.resource('s3')

# select bucket
my_bucket = s3.Bucket('my_bucket_name')

# download file into current directory
for s3_object in my_bucket.objects.all():
    # Need to split s3_object.key into path and file name, else it will give error file not found.
    path, filename = os.path.split(s3_object.key)
    my_bucket.download_file(s3_object.key, filename)

【讨论】:

简洁明了,有什么理由不用这个?它比所有其他解决方案更容易理解。集合似乎在后台为您做了很多事情。 我想你应该先创建所有子文件夹才能正常工作。 无论在 S3 中嵌套多深,此代码都会将所有内容放在***输出目录中。如果多个文件在不同的目录中具有相同的名称,它会一个接一个。我想你需要多一行:os.makedirs(path),然后下载目的地应该是object.key【参考方案4】:

Amazon S3 没有文件夹/目录。它是一个平面文件结构

为了保持目录的外观,路径名存储为对象 Key(文件名)的一部分。例如:

images/foo.jpg

在这种情况下,整个 Key 是 images/foo.jpg,而不仅仅是 foo.jpg

我怀疑您的问题是 boto 正在返回一个名为 my_folder/.8Df54234 的文件并试图将其保存到本地文件系统。但是,您的本地文件系统将 my_folder/ 部分解释为目录名称,并且该目录在您的本地文件系统上不存在

您可以截断文件名以仅保存.8Df54234 部分,或者您必须在写入文件之前创建必要的目录。请注意,它可以是多级嵌套目录。

更简单的方法是使用AWS Command-Line Interface (CLI),它将为您完成所有这些工作,例如:

aws s3 cp --recursive s3://my_bucket_name local_folder

还有一个sync 选项,它只会复制新的和修改过的文件。

【讨论】:

@j 我明白这一点。但我需要创建文件夹,就像aws s3 sync 一样自动创建。在boto3中是否可以。 您必须将目录的创建作为 Python 代码的一部分。如果密钥包含目录(例如foo/bar.txt),您将负责在调用s3.download_file 之前创建目录(foo)。这不是boto 的自动功能。 这里,S3 存储桶的内容是动态的,所以我必须检查 s3.list_objects(Bucket='my_bucket_name')['Contents'] 并过滤文件夹键并创建它们。 在玩了一段时间 Boto3 之后,这里列出的 AWS CLI 命令绝对是最简单的方法。 @Ben 请开始一个新问题,而不是提出一个问题作为对旧(2015 年)问题的评论。【参考方案5】:

我目前正在通过以下方式完成任务

#!/usr/bin/python
import boto3
s3=boto3.client('s3')
list=s3.list_objects(Bucket='bucket')['Contents']
for s3_key in list:
    s3_object = s3_key['Key']
    if not s3_object.endswith("/"):
        s3.download_file('bucket', s3_object, s3_object)
    else:
        import os
        if not os.path.exists(s3_object):
            os.makedirs(s3_object)

虽然它可以完成工作,但我不确定这样做是否好。 我把它留在这里是为了帮助其他用户和进一步的答案,以更好的方式实现这一目标

【讨论】:

【参考方案6】:

迟到总比没有好:) 以前使用 paginator 的答案非常好。然而它是递归的,你最终可能会达到 Python 的递归限制。这是另一种方法,有几个额外的检查。

import os
import errno
import boto3


def assert_dir_exists(path):
    """
    Checks if directory tree in path exists. If not it created them.
    :param path: the path to check if it exists
    """
    try:
        os.makedirs(path)
    except OSError as e:
        if e.errno != errno.EEXIST:
            raise


def download_dir(client, bucket, path, target):
    """
    Downloads recursively the given S3 path to the target directory.
    :param client: S3 client to use.
    :param bucket: the name of the bucket to download from
    :param path: The S3 directory to download.
    :param target: the local directory to download the files to.
    """

    # Handle missing / at end of prefix
    if not path.endswith('/'):
        path += '/'

    paginator = client.get_paginator('list_objects_v2')
    for result in paginator.paginate(Bucket=bucket, Prefix=path):
        # Download each file individually
        for key in result['Contents']:
            # Calculate relative path
            rel_path = key['Key'][len(path):]
            # Skip paths ending in /
            if not key['Key'].endswith('/'):
                local_file_path = os.path.join(target, rel_path)
                # Make sure directories exist
                local_file_dir = os.path.dirname(local_file_path)
                assert_dir_exists(local_file_dir)
                client.download_file(bucket, key['Key'], local_file_path)


client = boto3.client('s3')

download_dir(client, 'bucket-name', 'path/to/data', 'downloads')

【讨论】:

得到了KeyError: 'Contents'。输入路径'/arch/R/storeincomelogs/,完整路径/arch/R/storeincomelogs/201901/01/xxx.parquet > Got KeyError: 'Contents' Contents 当提供的前缀/路径没有任何文件时将不存在。添加if 'Contents' not in result: continue 应该可以解决问题,但我会在进行更改之前检查用例。【参考方案7】:

我有一个在同一进程中运行 AWS CLI 的解决方法。

awscli 安装为python 库:

pip install awscli

然后定义这个函数:

from awscli.clidriver import create_clidriver

def aws_cli(*cmd):
    old_env = dict(os.environ)
    try:

        # Environment
        env = os.environ.copy()
        env['LC_CTYPE'] = u'en_US.UTF'
        os.environ.update(env)

        # Run awscli in the same process
        exit_code = create_clidriver().main(*cmd)

        # Deal with problems
        if exit_code > 0:
            raise RuntimeError('AWS CLI exited with code '.format(exit_code))
    finally:
        os.environ.clear()
        os.environ.update(old_env)

执行:

aws_cli('s3', 'sync', '/path/to/source', 's3://bucket/destination', '--delete')

【讨论】:

我使用了相同的想法,但没有使用sync 命令,而是简单地执行命令aws s3 cp s3://bucket/folder local_folder --recursive。时间从几分钟(几乎 1 小时)减少到几秒钟 我正在使用此代码,但在显示所有调试日志时遇到问题。我在全球范围内声明了这个:logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.WARNING) logger = logging.getLogger(),并且只希望从根目录输出日志。有什么想法吗?【参考方案8】:

这里的很多解决方案都变得相当复杂。如果您正在寻找更简单的东西,cloudpathlib 为这个下载目录或文件的用例提供了一种很好的包装方式。

from cloudpathlib import CloudPath

cp = CloudPath("s3://bucket/product/myproject/2021-02-15/")
cp.download_to("local_folder")

注意:对于包含大量文件的大型文件夹,命令行中的awscli 可能更快。

【讨论】:

这真的很简单。只是为了完成这个答案。安装 cloudpathlib pip install cloudpathlib[s3]【参考方案9】:
import boto3, os

s3 = boto3.client('s3')

def download_bucket(bucket):
    paginator = s3.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=bucket)
    for page in pages:
      if 'Contents' in page:
        for obj in page['Contents']:
            os.path.dirname(obj['Key']) and os.makedirs(os.path.dirname(obj['Key']), exist_ok=True) 
            try:
                s3.download_file(bucket, obj['Key'], obj['Key'])
            except NotADirectoryError:
                pass

# Change bucket_name to name of bucket that you want to download
download_bucket(bucket_name)

这应该适用于所有数量的对象(也适用于超过 1000 个的对象)。每个分页器页面最多可以包含 1000 个对象。请注意 os.makedirs 函数中的额外参数 - exist_ok=True 导致路径存在时它不会抛出错误)

【讨论】:

【参考方案10】:

我已将格兰特的答案更新为并行运行,如果有人感兴趣,它会更快:

from concurrent import futures
import os
import boto3

def download_dir(prefix, local, bucket):

    client = boto3.client('s3')

    def create_folder_and_download_file(k):
        dest_pathname = os.path.join(local, k)
        if not os.path.exists(os.path.dirname(dest_pathname)):
            os.makedirs(os.path.dirname(dest_pathname))
        print(f'downloading k to dest_pathname')
        client.download_file(bucket, k, dest_pathname)

    keys = []
    dirs = []
    next_token = ''
    base_kwargs = 
        'Bucket': bucket,
        'Prefix': prefix,
    
    while next_token is not None:
        kwargs = base_kwargs.copy()
        if next_token != '':
            kwargs.update('ContinuationToken': next_token)
        results = client.list_objects_v2(**kwargs)
        contents = results.get('Contents')
        for i in contents:
            k = i.get('Key')
            if k[-1] != '/':
                keys.append(k)
            else:
                dirs.append(k)
        next_token = results.get('NextContinuationToken')
    for d in dirs:
        dest_pathname = os.path.join(local, d)
        if not os.path.exists(os.path.dirname(dest_pathname)):
            os.makedirs(os.path.dirname(dest_pathname))
    with futures.ThreadPoolExecutor() as executor:
        futures.wait(
            [executor.submit(create_folder_and_download_file, k) for k in keys],
            return_when=futures.FIRST_EXCEPTION,
        )

【讨论】:

【参考方案11】:

另一个使用 asyncio/aioboto 的并行下载器

import os, time
import asyncio
from itertools import chain
import json
from typing import List
from json.decoder import WHITESPACE
import logging
from functools import partial
from pprint import pprint as pp

# Third Party
import asyncpool
import aiobotocore.session
import aiobotocore.config

_NUM_WORKERS = 50


bucket_name= 'test-data'
bucket_prefix= 'etl2/test/20210330/f_api'


async def save_to_file(s3_client, bucket: str, key: str):
    
    response = await s3_client.get_object(Bucket=bucket, Key=key)
    async with response['Body'] as stream:
        content = await stream.read()
    
    if 1:
        fn =f'out/downloaded/bucket_name/key'

        dn= os.path.dirname(fn)
        if not isdir(dn):
            os.makedirs(dn,exist_ok=True)
        if 1:
            with open(fn, 'wb') as fh:
                fh.write(content)
                print(f'Downloaded to: fn')
   
    return [0]


async def go(bucket: str, prefix: str) -> List[dict]:
    """
    Returns list of dicts of object contents

    :param bucket: s3 bucket
    :param prefix: s3 bucket prefix
    :return: list of download statuses
    """
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger()

    session = aiobotocore.session.Aiosession()
    config = aiobotocore.config.AioConfig(max_pool_connections=_NUM_WORKERS)
    contents = []
    async with session.create_client('s3', config=config) as client:
        worker_co = partial(save_to_file, client, bucket)
        async with asyncpool.AsyncPool(None, _NUM_WORKERS, 's3_work_queue', logger, worker_co,
                                       return_futures=True, raise_on_join=True, log_every_n=10) as work_pool:
            # list s3 objects using paginator
            paginator = client.get_paginator('list_objects')
            async for result in paginator.paginate(Bucket=bucket, Prefix=prefix):
                for c in result.get('Contents', []):
                    contents.append(await work_pool.push(c['Key'], client))

    # retrieve results from futures
    contents = [c.result() for c in contents]
    return list(chain.from_iterable(contents))


def S3_download_bucket_files():
    s = time.perf_counter()
    _loop = asyncio.get_event_loop()
    _result = _loop.run_until_complete(go(bucket_name, bucket_prefix))
    assert sum(_result)==0, _result
    print(_result)
    elapsed = time.perf_counter() - s
    print(f"__file__ executed in elapsed:0.2f seconds.")

它将首先从 S3 中获取文件列表,然后使用 aioboto 下载,其中 _NUM_WORKERS=50 从网络并行读取数据。

【讨论】:

【参考方案12】:

一次性获取所有文件是一个非常糟糕的主意,您应该分批获取它。

我用来从 S3 获取特定文件夹(目录)的一种实现是,

def get_directory(directory_path, download_path, exclude_file_names):
    # prepare session
    session = Session(aws_access_key_id, aws_secret_access_key, region_name)

    # get instances for resource and bucket
    resource = session.resource('s3')
    bucket = resource.Bucket(bucket_name)

    for s3_key in self.client.list_objects(Bucket=self.bucket_name, Prefix=directory_path)['Contents']:
        s3_object = s3_key['Key']
        if s3_object not in exclude_file_names:
            bucket.download_file(file_path, download_path + str(s3_object.split('/')[-1])

如果您想获得整个存储桶,请通过 CIL 将其用作 @John Rotenstein mentioned,如下所示,

aws s3 cp --recursive s3://bucket_name download_path

【讨论】:

【参考方案13】:

如果您想使用 python 调用 bash 脚本,这里有一个简单的方法,可以将文件从 S3 存储桶中的文件夹加载到本地文件夹(在 Linux 机器中):

import boto3
import subprocess
import os

###TOEDIT###
my_bucket_name = "your_my_bucket_name"
bucket_folder_name = "your_bucket_folder_name"
local_folder_path = "your_local_folder_path"
###TOEDIT###

# 1.Load thes list of files existing in the bucket folder
FILES_NAMES = []
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(''.format(my_bucket_name))
for object_summary in my_bucket.objects.filter(Prefix="/".format(bucket_folder_name)):
#     print(object_summary.key)
    FILES_NAMES.append(object_summary.key)

# 2.List only new files that do not exist in local folder (to not copy everything!)
new_filenames = list(set(FILES_NAMES )-set(os.listdir(local_folder_path)))

# 3.Time to load files in your destination folder 
for new_filename in new_filenames:
    upload_S3files_CMD = """aws s3 cp s3://// """.format(my_bucket_name,bucket_folder_name,new_filename ,local_folder_path)

    subprocess_call = subprocess.call([upload_S3files_CMD], shell=True)
    if subprocess_call != 0:
        print("ALERT: loading files not working correctly, please re-check new loaded files")

【讨论】:

【参考方案14】:

From AWS S3 Docs (How do I use folders in an S3 bucket?):

在 Amazon S3 中,存储桶和对象是主要资源,对象存储在存储桶中。 Amazon S3 具有平面结构,而不是您在文件系统中看到的层次结构。但是,为了组织简单,Amazon S3 控制台支持将文件夹概念作为对对象进行分组的一种方式。 Amazon S3 通过为对象使用共享名称前缀(即,对象的名称以通用字符串开头)来实现这一点。对象名也称为键名。

例如,您可以在控制台上创建一个名为 photos 的文件夹,并在其中存储一个名为 myphoto.jpg 的对象。然后该对象以键名 photos/myphoto.jpg 存储,其中 photos/ 是前缀。

将“mybucket”中的所有文件下载到当前目录尊重存储桶的模拟目录结构(如果它们在本地不存在,则从存储桶创建文件夹):

import boto3
import os

bucket_name = "mybucket"
s3 = boto3.client("s3")
objects = s3.list_objects(Bucket = bucket_name)["Contents"]
for s3_object in objects:
    s3_key = s3_object["Key"]
    path, filename = os.path.split(s3_key)
    if len(path) != 0 and not os.path.exists(path):
        os.makedirs(path)
    if not s3_key.endswith("/"):
        download_to = path + '/' + filename if path else filename
        s3.download_file(bucket_name, s3_key, download_to)

【讨论】:

如果你能对你的代码做一些解释就更好了。 @johan,感谢您的反馈!我添加了相关解释【参考方案15】:
for objs in my_bucket.objects.all():
    print(objs.key)
    path='/tmp/'+os.sep.join(objs.key.split(os.sep)[:-1])
    try:
        if not os.path.exists(path):
            os.makedirs(path)
        my_bucket.download_file(objs.key, '/tmp/'+objs.key)
    except FileExistsError as fe:                          
        print(objs.key+' exists')

此代码将下载/tmp/ 目录中的内容。如果你愿意,你可以改变目录。

【讨论】:

【参考方案16】:

我得到了类似的要求,并通过阅读上述几个解决方案和其他网站获得了帮助,我想出了下面的脚本,只是想分享它是否可以帮助任何人。

from boto3.session import Session
import os

def sync_s3_folder(access_key_id,secret_access_key,bucket_name,folder,destination_path):    
    session = Session(aws_access_key_id=access_key_id,aws_secret_access_key=secret_access_key)
    s3 = session.resource('s3')
    your_bucket = s3.Bucket(bucket_name)
    for s3_file in your_bucket.objects.all():
        if folder in s3_file.key:
            file=os.path.join(destination_path,s3_file.key.replace('/','\\'))
            if not os.path.exists(os.path.dirname(file)):
                os.makedirs(os.path.dirname(file))
            your_bucket.download_file(s3_file.key,file)
sync_s3_folder(access_key_id,secret_access_key,bucket_name,folder,destination_path)

【讨论】:

【参考方案17】:

重新发布@glefait 的答案并在末尾加上 if 条件以避免 os 错误 20。它获得的第一个键是文件夹名称本身,它不能写入目标路径。

def download_dir(client, resource, dist, local='/tmp', bucket='your_bucket'):
    paginator = client.get_paginator('list_objects')
    for result in paginator.paginate(Bucket=bucket, Delimiter='/', Prefix=dist):
        if result.get('CommonPrefixes') is not None:
            for subdir in result.get('CommonPrefixes'):
                download_dir(client, resource, subdir.get('Prefix'), local, bucket)
        for file in result.get('Contents', []):
            print("Content: ",result)
            dest_pathname = os.path.join(local, file.get('Key'))
            print("Dest path: ",dest_pathname)
            if not os.path.exists(os.path.dirname(dest_pathname)):
                print("here last if")
                os.makedirs(os.path.dirname(dest_pathname))
            print("else file key: ", file.get('Key'))
            if not file.get('Key') == dist:
                print("Key not equal? ",file.get('Key'))
                resource.meta.client.download_file(bucket, file.get('Key'), dest_pathname)enter code here

【讨论】:

【参考方案18】:

我遇到这个问题已经有一段时间了,在我浏览过的所有不同的论坛中,我还没有看到完整的端到端的片段。所以,我继续完成所有部分(我自己添加一些东西)并创建了一个完整的端到端 S3 下载器!

这不仅会自动下载文件,而且如果 S3 文件位于子目录中,它会在本地存储上创建它们。在我的应用程序实例中,我需要设置权限和所有者,所以我也添加了(如果不需要,可以注释掉)。

这已经过测试并且可以在 Docker 环境 (K8) 中运行,但是我已经在脚本中添加了环境变量,以防您想在本地测试/运行它。

我希望这对寻求 S3 下载自动化的人有所帮助。我也欢迎任何关于如何在需要时更好地优化它的建议、信息等。

#!/usr/bin/python3
import gc
import logging
import os
import signal
import sys
import time
from datetime import datetime

import boto
from boto.exception import S3ResponseError
from pythonjsonlogger import jsonlogger

formatter = jsonlogger.JsonFormatter('%(message)%(levelname)%(name)%(asctime)%(filename)%(lineno)%(funcName)')

json_handler_out = logging.StreamHandler()
json_handler_out.setFormatter(formatter)

#Manual Testing Variables If Needed
#os.environ["DOWNLOAD_LOCATION_PATH"] = "some_path"
#os.environ["BUCKET_NAME"] = "some_bucket"
#os.environ["AWS_ACCESS_KEY"] = "some_access_key"
#os.environ["AWS_SECRET_KEY"] = "some_secret"
#os.environ["LOG_LEVEL_SELECTOR"] = "DEBUG, INFO, or ERROR"

#Setting Log Level Test
logger = logging.getLogger('json')
logger.addHandler(json_handler_out)
logger_levels = 
    'ERROR' : logging.ERROR,
    'INFO' : logging.INFO,
    'DEBUG' : logging.DEBUG

logger_level_selector = os.environ["LOG_LEVEL_SELECTOR"]
logger.setLevel(logger_level_selector)

#Getting Date/Time
now = datetime.now()
logger.info("Current date and time : ")
logger.info(now.strftime("%Y-%m-%d %H:%M:%S"))

#Establishing S3 Variables and Download Location
download_location_path = os.environ["DOWNLOAD_LOCATION_PATH"]
bucket_name = os.environ["BUCKET_NAME"]
aws_access_key_id = os.environ["AWS_ACCESS_KEY"]
aws_access_secret_key = os.environ["AWS_SECRET_KEY"]
logger.debug("Bucket: %s" % bucket_name)
logger.debug("Key: %s" % aws_access_key_id)
logger.debug("Secret: %s" % aws_access_secret_key)
logger.debug("Download location path: %s" % download_location_path)

#Creating Download Directory
if not os.path.exists(download_location_path):
    logger.info("Making download directory")
    os.makedirs(download_location_path)

#Signal Hooks are fun
class GracefulKiller:
    kill_now = False
    def __init__(self):
        signal.signal(signal.SIGINT, self.exit_gracefully)
        signal.signal(signal.SIGTERM, self.exit_gracefully)
    def exit_gracefully(self, signum, frame):
        self.kill_now = True

#Downloading from S3 Bucket
def download_s3_bucket():
    conn = boto.connect_s3(aws_access_key_id, aws_access_secret_key)
    logger.debug("Connection established: ")
    bucket = conn.get_bucket(bucket_name)
    logger.debug("Bucket: %s" % str(bucket))
    bucket_list = bucket.list()
#    logger.info("Number of items to download: 0".format(len(bucket_list)))

    for s3_item in bucket_list:
        key_string = str(s3_item.key)
        logger.debug("S3 Bucket Item to download: %s" % key_string)
        s3_path = download_location_path + "/" + key_string
        logger.debug("Downloading to: %s" % s3_path)
        local_dir = os.path.dirname(s3_path)

        if not os.path.exists(local_dir):
            logger.info("Local directory doesn't exist, creating it... %s" % local_dir)
            os.makedirs(local_dir)
            logger.info("Updating local directory permissions to %s" % local_dir)
#Comment or Uncomment Permissions based on Local Usage
            os.chmod(local_dir, 0o775)
            os.chown(local_dir, 60001, 60001)
        logger.debug("Local directory for download: %s" % local_dir)
        try:
            logger.info("Downloading File: %s" % key_string)
            s3_item.get_contents_to_filename(s3_path)
            logger.info("Successfully downloaded File: %s" % s3_path)
            #Updating Permissions
            logger.info("Updating Permissions for %s" % str(s3_path))
#Comment or Uncomment Permissions based on Local Usage
            os.chmod(s3_path, 0o664)
            os.chown(s3_path, 60001, 60001)
        except (OSError, S3ResponseError) as e:
            logger.error("Fatal error in s3_item.get_contents_to_filename", exc_info=True)
            # logger.error("Exception in file download from S3: ".format(e))
            continue
        logger.info("Deleting %s from S3 Bucket" % str(s3_item.key))
        s3_item.delete()

def main():
    killer = GracefulKiller()
    while not killer.kill_now:
        logger.info("Checking for new files on S3 to download...")
        download_s3_bucket()
        logger.info("Done checking for new files, will check in 120s...")
        gc.collect()
        sys.stdout.flush()
        time.sleep(120)
if __name__ == '__main__':
    main()

【讨论】:

以上是关于Boto3 从 S3 存储桶下载所有文件的主要内容,如果未能解决你的问题,请参考以下文章

使用 python boto3 将文件从一个 S3 存储桶传输到另一个 S3 存储桶

使用 boto3 lib 和 AWS Lambda 从 S3 存储桶中的压缩文件中获取数据流

使用boto3,从整个文件夹或文件从一个s3存储桶复制到同一区域的另一个文件夹时,如何提供访问密钥和秘密访问密钥?

我们可以使用 boto3 Python 在 aws s3 存储桶之间递归复制文件和文件夹吗?

从 S3 存储桶中读取大量 CSV 文件

Pyspark 从 S3 存储桶的子目录中读取所有 JSON 文件