使Boto3上传调用阻塞(单线程)

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使Boto3上传调用阻塞(单线程)相关的知识,希望对你有一定的参考价值。

编辑:我原来的假设被证明是错误的。我在这里添加了一个冗长的答案,我邀请其他人进行压力测试并纠正。


我正在寻找一种方法来以单线程方式利用Boto3 S3 API来模仿线程安全的键值存储。简而言之,我想使用调用线程而不是新线程来进行上传。

据我所知,boto3(或.upload_fileobj())中.upload_file()方法的默认行为是将任务踢到新线程并立即返回None

来自docs

这是一个托管传输,如有必要,它将在多个线程中执行分段上传。

(如果我对此的理解是错误的,那么对此进行修正也会有所帮助。这是在Boto3 1.9.134中。)

>>> import io
>>> import boto3
>>> bucket = boto3.resource('s3').Bucket('my-bucket-name')
>>> buf = io.BytesIO(b"test")
>>> res = bucket.upload_fileobj(buf, 'testobj')
>>> res is None
True

现在,让我们说buf不是一个短的4字节字符串,而是一个巨大的文本blob,它将花费不可忽略的时间来完全上传。

我还使用此函数来检查具有给定键的对象是否存在:

def key_exists_in_bucket(bucket_obj, key: str) -> bool:
    try:
        bucket_obj.Object(key).load()
    except botocore.exceptions.ClientError:
        return False
    else:
        return True

我的意图是如果对象存在名称,则不重写该对象。

这里的竞争条件相当明显:异步启动上传,然后使用key_exists_in_bucket()快速检查,如果对象仍在写入,则返回False,然后由于此而不必要地再次写入。

有没有办法确保当前线程调用bucket.upload_fileobj()而不是在该方法范围内创建的新线程?

我意识到这会减慢速度。在这种情况下,我愿意牺牲速度。

答案

upload_fileobj接受Config参数。这是一个boto3.s3.transfer.TransferConfig对象,后者又有一个名为use_threads的参数(默认为true) - 如果为True,则在执行S3传输时将使用线程。如果为False,则在执行传输时不会使用任何线程:所有逻辑都将在主线程中运行。

希望这对你有用。

另一答案

测试方法是否阻塞: 我自己经验性地测试了这种行为。首先,我生成了一个100MB的文件:

dd if=/dev/zero of=100mb.txt  bs=100M  count=1

然后我尝试以与您相同的方式上传文件并测量所花费的时间:

import boto3
import time
import io
file = open('100mb.txt', 'rb')
buf = io.BytesIO(file.read())
bucket = boto3.resource('s3').Bucket('testbucket')
start = time.time()
print("starting to upload...")
bucket.upload_fileobj(buf, '100mb')
print("finished uploading")
end = time.time()
print("time: ".format(end-start))

upload_fileobj()方法完成和下一个要读取的python行(1gb文件为50秒)花了8秒多,所以我假设这个方法是阻塞的。

使用线程测试:

使用多个线程时,即使使用use_threads = False选项,我也可以验证该方法是否同时支持多次传输。我开始上传一个200mb文件,然后是100mb,首先完成100mb文件。这证实了TransferConfig中的并发性与多部分传输有关。

码:

import boto3
import time
import io
from boto3.s3.transfer import TransferConfig
import threading

config = TransferConfig(use_threads=False)

bucket = boto3.resource('s3').Bucket('testbucket')
def upload(filename):
     file = open(filename, 'rb')
     buf = io.BytesIO(file.read())
     start = time.time()
     print("starting to upload file ".format(filename))
     bucket.upload_fileobj(buf,filename,Config=config)
     end = time.time()
     print("finished uploading file . time: ".format(filename,end-start))
x1 = threading.Thread(target=upload, args=('200mb.txt',))
x2 = threading.Thread(target=upload, args=('100mb.txt',))
x1.start()
time.sleep(2)
x2.start()

输出:

开始上传文件200mb.txt 开始上传文件100mb.txt 完成上传文件100mb.txt。时间:46.35254502296448 完成上传文件200mb.txt。时间:61.70564889907837

使用会话进行测试: 如果您希望上传方法按照调用顺序完成,那么这就是您所需要的。

码:

import boto3
import time
import io
from boto3.s3.transfer import TransferConfig
import threading

config = TransferConfig(use_threads=False)

session = boto3.session.Session()
s3 = session.resource('s3')
bucket = s3.Bucket('testbucket')
def upload(filename):
     file = open(filename, 'rb')
     buf = io.BytesIO(file.read())
     start = time.time()
     print("starting to upload file ".format(filename))
     bucket.upload_fileobj(buf,filename)
     end = time.time()
     print("finished uploading file . time: ".format(filename,end-start))
x1 = threading.Thread(target=upload, args=('200mb.txt',))
x2 = threading.Thread(target=upload, args=('100mb.txt',))
x1.start()
time.sleep(2)
x2.start()

输出:

开始上传文件200mb.txt 开始上传文件100mb.txt 完成上传文件200mb.txt。时间:46.62478971481323 完成上传文件100mb.txt。时间:50.515950202941895

我发现的一些资源: - This是一个问题,在这里询问关于阻塞或非阻塞的方法。这不是决定性因素,但可能存在相关信息。 - 在GitHub上有一个开放的issue,允许在boto3中进行同步传输。 - 还有像aiobotoaiobotocore这样的工具,专门用于允许异步下载和上传/到s3和其他aws服务。

关于我之前的回答: 您可以在boto3中阅读有关文件传输配置的here。特别是:

传输操作使用线程来实现并发。可以通过将use_threads属性设置为False来禁用线程使用。

最初我认为这与同时执行的多个传输有关。但是,在使用TransferConfig时,读取source code参数max_concurrency中的注释会解释并发性并不是指多次传输,而是指“将要求执行传输的线程数”。所以这是用来加速转移的东西。 use_threads属性仅用于允许多部分传输中的并发。

另一答案

我认为,既然这个问题和another similar question的答案似乎都存在直接冲突,那么最好直接使用pdb来源。

Summary

  • boto3默认使用多个线程(10)
  • 但是,它不是异步的,因为它在返回之前等待(加入)这些线程,而不是使用“即发即忘”技术
  • 因此,通过这种方式,如果您尝试与来自多个客户端的s3存储桶通信,则读/写线程安全就绪。

Detail

我在这里努力解决的一个方面是多个(子线程)并不意味着顶级方法本身是非阻塞的:如果调用线程启动上传到多个子线程,但是等待那些线程到完成并返回,我冒昧地说这仍然是一个阻塞的电话。另一方面,如果方法调用是在asyncio中说的话,那就是“即发即忘”。使用threading,这有效地归结为x.join()是否被称为。

以下是从Victor Val开始的初始代码,用于启动调试器:

import io
import pdb

import boto3

# From dd if=/dev/zero of=100mb.txt  bs=50M  count=1
buf = io.BytesIO(open('100mb.txt', 'rb').read())
bucket = boto3.resource('s3').Bucket('test-threads')
pdb.run("bucket.upload_fileobj(buf, '100mb')")

这个堆栈帧来自Boto 1.9.134。

现在跳进pdb

.upload_fileobj()首先调用一个嵌套方法 - 还没有多少看到。

(Pdb) s
--Call--
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(542)bucket_upload_fileobj()
-> def bucket_upload_fileobj(self, Fileobj, Key, ExtraArgs=None,
(Pdb) s

(Pdb) l
574     
575         :type Config: boto3.s3.transfer.TransferConfig
576         :param Config: The transfer configuration to be used when performing the
577             upload.
578         """
579  ->     return self.meta.client.upload_fileobj(
580             Fileobj=Fileobj, Bucket=self.name, Key=Key, ExtraArgs=ExtraArgs,
581             Callback=Callback, Config=Config)
582     
583     
584  

所以顶级方法确实会返回一些东西,但目前还不清楚这个东西最终会变成None

所以我们进入那个。

现在,.upload_fileobj()确实有一个config参数,默认为None:

(Pdb) l 531
526     
527         subscribers = None
528         if Callback is not None:
529             subscribers = [ProgressCallbackInvoker(Callback)]
530     
531         config = Config
532         if config is None:
533             config = TransferConfig()
534     
535         with create_transfer_manager(self, config) as manager:
536             future = manager.upload(

这意味着config成为默认的TransferConfig()

  • use_threads - 如果为True,则在执行S3传输时将使用线程。如果为False,则在执行传输时不会使用任何线程:所有逻辑都将在主线程中运行。
  • max_concurrency - 将要求执行传输的最大线程数。如果use_threads设置为False,则忽略提供的值,因为传输只会使用主线程。

而且,在这里,他们是:

(Pdb) unt 534
> /home/ubuntu/envs/py372/lib/python3.7/site-packages/boto3/s3/inject.py(535)upload_fileobj()
-> with create_transfer_manager(self, config) as manager:
(Pdb) config
<boto3.s3.transfer.TransferConfig object at 0x7f1790dc0cc0>
(Pdb) config.use_threads
True
(Pdb) config.max_concurrency
10

现在我们在调用堆栈中下降一个级别以使用TransferManager(上下文管理器)。在这一点上,max_concurrency被用作类似命名的max_request_concurrency的参数:

# https://github.com/boto/s3transfer/blob/2aead638c8385d8ae0b1756b2de17e8fad45fffa/s3transfer/manager.py#L223

    # The executor responsible for making S3 API transfer requests
    self._request_executor = BoundedExecutor(
        max_size=self._config.max_request_queue_size,
        max_num_threads=self._config.max_request_concurrency,
        tag_semaphores=
            IN_MEMORY_UPLOAD_TAG: TaskSemaphore(
                self._config.max_in_memory_upload_chunks),
            IN_MEMORY_DO

以上是关于使Boto3上传调用阻塞(单线程)的主要内容,如果未能解决你的问题,请参考以下文章

单线程 异步 同步 阻塞 非阻塞

一起来写web server 06 -- 单线程非阻塞IO版本

分布式|单线程的redis为何如此快?

NodeJS 真的是单线程的吗?

Python来实现并发的Web Server,其中采用了多进程多线程协程单进程单线程非阻塞的方式

异步/同步,阻塞/非阻塞,单线程/多线程概念梳理