使用 Python 向 Google Cloud Storage 写入流式传输

Posted

技术标签:

【中文标题】使用 Python 向 Google Cloud Storage 写入流式传输【英文标题】:Write-streaming to Google Cloud Storage in Python 【发布时间】:2019-04-03 18:27:18 【问题描述】:

我正在尝试将用Python 编写的AWS Lambda 函数迁移到 CF

    即时解压缩并逐行读取 对每一行执行一些光变换 将未压缩的输出(一次一行或多个块)写入 GCS

输出大于 2GB - 但略小于 3GB,因此它适合 Lambda正好

嗯,这似乎是不可能的,或者更多地涉及GCP

未压缩的文件无法放入内存或/tmp - 在撰写本文时限制为 2048MB - 因此无法使用 Python 客户端库 upload_from_file(或 _filename) 有this 官方文件,但令我惊讶的是,它指的是boto,一个最初为AWS S3 设计的库,并且由于boto3 已经存在一段时间了,所以它已经过时了。没有真正的GCP 方法来流式写入或读取 Node.js 有一个简单的 createWriteStream() - 不错的文章 here 顺便说一句 - 但在 Python 中没有等效的单行代码 Resumable media upload 听起来很像,但是在 Node 中处理的很多代码更容易 AppEngine 有 cloudstorage,但在它之外不可用 - 并且已过时 在工作包装器上几乎没有示例,用于逐行写入文本/纯数据,就好像GCS 是本地文件系统一样。这不仅限于Cloud Functions 和 Python 客户端库的缺失功能,但由于资源限制,它在 CF 中更为严重。顺便说一句,我是 discussion 的一部分,添加了一个可写的 IOBase 函数,但它没有任何吸引力。 显然使用虚拟机或DataFlow 对手头的任务来说是不可能的。

在我看来,从基于云的存储中读取/写入的流(或类似流)甚至应该包含在 Python 标准库中。

按照当时的建议,您仍然可以使用GCSFS,它会在您将内容写入 FileObj 时在后台为您分块提交上传。 同一个团队写了s3fs。我不知道 Azure。

AFAIC,我会坚持使用AWS Lambda,因为输出可以容纳在内存中 - 目前 - 但分段上传是支持任何输出大小且内存最少的方法。

想法或替代方案?

【问题讨论】:

upload_from_file 使用类似文件的对象,所以也许您可以使用生成器来完成您想要的工作? 不幸的是,它要求文件处理程序以只读模式打开,而不是混合(读/写)。换句话说,该文件必须已经完整存在。目标是读取(写入 GCS/S3)作为写入内存中的处理程序。 【参考方案1】:

我对 multipartresumable 上传感到困惑。后者是您“流式传输”所需要的——它实际上更像是上传缓冲流的块。

Multipart 上传是在同一个 API 调用中一次加载数据和自定义元数据。

虽然我非常喜欢 GCSFS - Martin,但他的主要贡献者非常敏感 - 我最近发现 an alternative 使用了 google-resumable-media 库。

GCSFS 建立在核心 http API 之上,而 Seth 的解决方案使用由 Google 维护的低级库,与 API 更改更加同步,其中包括指数备份。后者对于大/长流来说确实是必须的,因为连接可能会中断,即使在 GCP 内 - 我们遇到了 GCF 的问题。

最后,我仍然相信Google Cloud Library 是添加类似流的功能的正确位置,基本的writeread。它有core code already。

如果您也对核心库中的该功能感兴趣,请点赞here - 假设优先级基于此。

【讨论】:

【参考方案2】:

smart_open 现在支持 GCS,还支持动态解压。

import lzma
from smart_open import open, register_compressor

def _handle_xz(file_obj, mode):
    return lzma.LZMAFile(filename=file_obj, mode=mode, format=lzma.FORMAT_XZ)

register_compressor('.xz', _handle_xz)

# stream from GCS
with open('gs://my_bucket/my_file.txt.xz') as fin:
    for line in fin:
        print(line)

# stream content *into* GCS (write mode):
with open('gs://my_bucket/my_file.txt.xz', 'wb') as fout:
    fout.write(b'hello world')

【讨论】:

以上是关于使用 Python 向 Google Cloud Storage 写入流式传输的主要内容,如果未能解决你的问题,请参考以下文章

使用 Google Cloud PubSub 不断收到“向 Cloud PubSub 发送测试消息时出错...”

在Google Cloud中部署Python App时如何处理打开文件的路径?

Google Cloud Tasks 无法向 Cloud Run 进行身份验证

如何在Python和Java中访问Google Cloud Endpoints请求标头

在哪里可以找到 Google Cloud Python 官方库的旧版本文档?

通过 Python Google Cloud Function 发送电子邮件的最佳方式是啥?