在 GCP 中按行拆分大文件

Posted

技术标签:

【中文标题】在 GCP 中按行拆分大文件【英文标题】:Splitting large files by rows in GCP 【发布时间】:2021-09-01 12:26:55 【问题描述】:

我在 GCP 中有一些非常大的文件,我想先拆分它们,然后再复制到 AWS 以供 lambda 处理。

文件可以大到 50GB,包含数百万行。我正在尝试将它们拆分为 100k 行以供 lambda 处理。

据我所知,gsutils 中没有任何东西可以做到这一点。

我尝试将文件拆分器编写为云函数并部署在 App Engine 中,但我在测试中遇到了内存问题。我去了一个 F4 实例,但这仍然是内存不足。这是我只处理 500mb 文件时遇到的错误:

Exceeded hard memory limit of 1024 MB with 1787 MB after servicing 0 requests total. Consider setting a larger instance class in app.yaml

这是部署到 App Engine 以进行文件拆分的代码:

@app.route('/')
def run():
    LOGGER.info(f"Request received with the following arguments: request.args")

    # Request args
    bucket_name = request.args.get('bucket_name')
    file_location = request.args.get('file_location')
    chunk_size = int(request.args.get('chunk_size', 100000))

    LOGGER.info(f"Getting files in bucket: [bucket_name] with prefix: [file_location]")
    storage_client = storage.Client()

    for blob in storage_client.list_blobs(bucket_name, prefix=file_location):
        blob_name = str(blob.name)
        if "json" in blob_name:
            LOGGER.info(f"Found blob: [blob_name]")
            blob_split = blob_name.split("/")
            file_name = blob_split[-1]

            bucket = storage_client.get_bucket(bucket_name)
            LOGGER.info(f"Downloading file: [file_name]")
            download_blob = bucket.get_blob(blob_name)
            downloaded_blob_string = download_blob.download_as_string()
            downloaded_json_data = downloaded_blob_string.decode("utf-8").splitlines()
            LOGGER.info(f"Got blob: [file_name]")
            file_count = len(downloaded_json_data)
            LOGGER.info(f"Blob [file_name] has file_count rows")

            for file_number in range(0, file_count - 1, chunk_size):
                range_min = file_number
                range_max = file_number + chunk_size - 1
                if range_max > file_count:
                    range_max = file_count - 1
                LOGGER.info(f"Generating file for rows: range_min+1 - range_max+1")
                split_file = "\n".join(downloaded_json_data[range_min:range_max+1]).encode("utf-8")
                LOGGER.info(f"Attempting upload of file for rows: range_min+1 - range_max+1")
                upload_blob = bucket.blob(f"file_locationsplit/file_name_split_range_min+1-range_max+1")
                upload_blob.upload_from_string(split_file)
                LOGGER.info(f"Upload complete for rows: range_min+1 - range_max+1")
            LOGGER.info(f"Successfully split file: file_name")
    LOGGER.info(f"Completed all file splits for file_location")
    return "success"

有没有更有效的方法来做到这一点?我还有什么其他选择?

我想自动化这个文件拆分过程,我们必须在一个月内执行几次。我最好的选择是每次都启动一个 GCE 实例,这样我就可以运行以下命令:

 split -l 100000 file.json

然后在拆分完成后关闭它?

【问题讨论】:

最便宜的是使用计算引擎;最具可扩展性的解决方案是使用 Dataflow(但需要在 Beam 编程语言模型中进行升级) @guillaumeblaquiere 您将如何在 Dataflow 中执行此操作?我认为无法控制 Dataflow 中输出的行数/文件数 另一个选项是通过 Cloud Storage 对象流式传输或范围,以便您的例如云函数能够处理约束大小的块。 Cloud Storage API 支持两者。在 Python 中有几种选择,包括 download_as_bytes。我怀疑你也可以这样并行化。从效率的角度来看,使用 Python 以外的语言可能会更好。 您特别要求使用 Lambda,但是,Google Cloud Functions(可能是 Cloud Run 等)提供了类似的功能,并且如果在本地(预)处理大文件,您将能够潜在地减少大文件的网络传输量在 GCP 上可以进一步减少运输。 @DazWilkin 不幸的是我没有选择 lambda,那一点我无法控制 【参考方案1】:

对于 exceeded hard memory limit 错误,您可以使用 App Engine Flex 并设置您的 custom machine resources。 为此,您将使用您的实例在 app.yaml 的资源部分中拥有的特定硬件部署 App Engine Flex 应用程序。

但最好的方法是在与 GCS 存储桶相同的区域中启动 GCE 实例,将对象下载到该实例(这应该非常快,因为它只有几十 GB),解压缩对象(这会更慢),将那个大文件分成一堆小文件(linux split 命令对此很有用),然后将对象上传回 GCS。

【讨论】:

以上是关于在 GCP 中按行拆分大文件的主要内容,如果未能解决你的问题,请参考以下文章

如何将大文本文件拆分为行数相等的小文件?

按行数拆分大型excel文件

PHP快速按行读取CSV大文件的封装类分享(也适用于其它超大文本文件)

拆分大txt文件成多个txt文件

用shell写一个程序,把一个大文件拆分成为多个小文件,要求:拆分文件以“;”作为分隔判断来拆分文件;

unix shell 如何拆分文件