使用 Python 向 Google Cloud Storage 写入流式传输
Posted
技术标签:
【中文标题】使用 Python 向 Google Cloud Storage 写入流式传输【英文标题】:Write-streaming to Google Cloud Storage in Python 【发布时间】:2019-04-03 18:27:18 【问题描述】:我正在尝试将用Python
编写的AWS Lambda
函数迁移到 CF
-
即时解压缩并逐行读取
对每一行执行一些光变换
将未压缩的输出(一次一行或多个块)写入 GCS
输出大于 2GB - 但略小于 3GB,因此它适合 Lambda
,正好。
嗯,这似乎是不可能的,或者更多地涉及GCP
:
/tmp
- 在撰写本文时限制为 2048MB - 因此无法使用 Python 客户端库 upload_from_file
(或 _filename
)
有this 官方文件,但令我惊讶的是,它指的是boto
,一个最初为AWS S3
设计的库,并且由于boto3
已经存在一段时间了,所以它已经过时了。没有真正的GCP
方法来流式写入或读取
Node.js 有一个简单的 createWriteStream()
- 不错的文章 here 顺便说一句 - 但在 Python 中没有等效的单行代码
Resumable media upload 听起来很像,但是在 Node 中处理的很多代码更容易
AppEngine 有 cloudstorage,但在它之外不可用 - 并且已过时
在工作包装器上几乎没有示例,用于逐行写入文本/纯数据,就好像GCS
是本地文件系统一样。这不仅限于Cloud Functions
和 Python 客户端库的缺失功能,但由于资源限制,它在 CF 中更为严重。顺便说一句,我是 discussion 的一部分,添加了一个可写的 IOBase 函数,但它没有任何吸引力。
显然使用虚拟机或DataFlow
对手头的任务来说是不可能的。
在我看来,从基于云的存储中读取/写入的流(或类似流)甚至应该包含在 Python 标准库中。
按照当时的建议,您仍然可以使用GCSFS,它会在您将内容写入 FileObj 时在后台为您分块提交上传。
同一个团队写了s3fs
。我不知道 Azure。
AFAIC,我会坚持使用AWS Lambda
,因为输出可以容纳在内存中 - 目前 - 但分段上传是支持任何输出大小且内存最少的方法。
想法或替代方案?
【问题讨论】:
upload_from_file 使用类似文件的对象,所以也许您可以使用生成器来完成您想要的工作? 不幸的是,它要求文件处理程序以只读模式打开,而不是混合(读/写)。换句话说,该文件必须已经完整存在。目标是读取(写入 GCS/S3)作为写入内存中的处理程序。 【参考方案1】:我对 multipart
与 resumable
上传感到困惑。后者是您“流式传输”所需要的——它实际上更像是上传缓冲流的块。
Multipart
上传是在同一个 API 调用中一次加载数据和自定义元数据。
虽然我非常喜欢 GCSFS - Martin,但他的主要贡献者非常敏感 - 我最近发现 an alternative 使用了 google-resumable-media
库。
GCSFS
建立在核心 http API 之上,而 Seth 的解决方案使用由 Google 维护的低级库,与 API 更改更加同步,其中包括指数备份。后者对于大/长流来说确实是必须的,因为连接可能会中断,即使在 GCP
内 - 我们遇到了 GCF
的问题。
最后,我仍然相信Google Cloud Library 是添加类似流的功能的正确位置,基本的write
和read
。它有core code already。
如果您也对核心库中的该功能感兴趣,请点赞here - 假设优先级基于此。
【讨论】:
【参考方案2】:smart_open 现在支持 GCS,还支持动态解压。
import lzma
from smart_open import open, register_compressor
def _handle_xz(file_obj, mode):
return lzma.LZMAFile(filename=file_obj, mode=mode, format=lzma.FORMAT_XZ)
register_compressor('.xz', _handle_xz)
# stream from GCS
with open('gs://my_bucket/my_file.txt.xz') as fin:
for line in fin:
print(line)
# stream content *into* GCS (write mode):
with open('gs://my_bucket/my_file.txt.xz', 'wb') as fout:
fout.write(b'hello world')
【讨论】:
以上是关于使用 Python 向 Google Cloud Storage 写入流式传输的主要内容,如果未能解决你的问题,请参考以下文章
使用 Google Cloud PubSub 不断收到“向 Cloud PubSub 发送测试消息时出错...”
在Google Cloud中部署Python App时如何处理打开文件的路径?
Google Cloud Tasks 无法向 Cloud Run 进行身份验证
如何在Python和Java中访问Google Cloud Endpoints请求标头