如何使用 AWS Glue 将许多 CSV 文件转换为 Parquet

Posted

技术标签:

【中文标题】如何使用 AWS Glue 将许多 CSV 文件转换为 Parquet【英文标题】:How to Convert Many CSV files to Parquet using AWS Glue 【发布时间】:2018-10-03 19:16:46 【问题描述】:

我正在使用 AWS S3、Glue 和 Athena,设置如下:

S3 --> 胶水 --> 雅典娜

我的原始数据以 CSV 文件的形式存储在 S3 上。我正在使用 Glue 进行 ETL,并且我正在使用 Athena 来查询数据。

由于我使用的是 Athena,因此我想将 CSV 文件转换为 Parquet。我现在正在使用 AWS Glue 来执行此操作。这是我正在使用的当前流程:

    运行 Crawler 以读取 CSV 文件并填充数据目录。 运行 ETL 作业以从数据目录创建 Parquet 文件。 运行爬虫以使用 Parquet 文件填充数据目录。

Glue 作业一次只允许我转换一个表。如果我有很多 CSV 文件,这个过程很快就会变得难以管理。有没有更好的方法,也许是“正确”的方法,使用 AWS Glue 或其他一些 AWS 服务将 许多 CSV 文件转换为 Parquet?

【问题讨论】:

【参考方案1】:

我遇到了完全相同的情况,我想有效地循环遍历由爬虫编目的目录表,这些目录表指向 csv 文件,然后将它们转换为镶木地板。不幸的是,网络上还没有太多可用的信息。这就是为什么我在LinkedIn 中写了一篇博客来解释我是如何做到的。请阅读;特别是第 5 点。希望有帮助。请让我知道您的反馈。

注意:根据 Antti 的反馈,我从下面的博客中粘贴了摘录的解决方案:

    遍历目录/数据库/表

作业向导带有在数据源上运行预定义脚本的选项。问题是您可以选择的数据源是目录中的单个表。它没有让您选择在整个数据库或一组表上运行作业。无论如何,您可以稍后修改脚本,但是在胶水目录中迭代数据库表的方法也很难找到。有目录 API,但缺少合适的示例。 github 示例 repo 可以丰富更多场景以帮助开发人员。

经过一番折腾,我想出了下面的脚本来完成这项工作。我使用 boto3 客户端循环遍历表。如果涉及到某人的帮助,我将其粘贴在这里。如果您有更好的建议,我也想听听您的意见

import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


client = boto3.client('glue', region_name='ap-southeast-2')

databaseName = 'tpc-ds-csv'
print '\ndatabaseName: ' + databaseName

Tables = client.get_tables(DatabaseName=databaseName)

tableList = Tables['TableList']

for table in tableList:
    tableName = table['Name']
    print '\n-- tableName: ' + tableName

    datasource0 = glueContext.create_dynamic_frame.from_catalog(
        database="tpc-ds-csv", 
        table_name=tableName, 
        transformation_ctx="datasource0"
    )

    datasink4 = glueContext.write_dynamic_frame.from_options(
        frame=datasource0,
        connection_type="s3", 
        connection_options=
            "path": "s3://aws-glue-tpcds-parquet/"+ tableName + "/"
            ,
        format="parquet",
        transformation_ctx="datasink4"
    )
job.commit()

【讨论】:

我很久以前就解决了这个问题。您链接的博客文章中提到的解决方案与我最终所做的几乎相同。我希望 AWS 能够及时更新他们的 Glue 文档。目前严重缺乏。 这不是一个答案,除非您实际上至少提供了答案本身的细节的一瞥。 你说得对,安蒂。那时我是新的贡献者,还在学习。我已经用实际的解决方案编辑了答案【参考方案2】:

请参阅编辑以获取更新信息。

S3 --> 雅典娜

为什么不直接在 Athena 中使用 CSV 格式?

https://docs.aws.amazon.com/athena/latest/ug/supported-format.html

CSV 是受支持的格式之一。此外,为了提高效率,您可以压缩多个 CSV 文件以加快加载速度。

支持的压缩,

https://docs.aws.amazon.com/athena/latest/ug/compression-formats.html

希望对你有帮助。

编辑:

为什么 Parquet 格式比 CSV 更有用?

https://dzone.com/articles/how-to-be-a-hero-with-powerful-parquet-google-and

S3 --> 胶水 --> 雅典娜

有关 CSV 到 Parquet 转换的更多详细信息,

https://aws.amazon.com/blogs/big-data/build-a-data-lake-foundation-with-aws-glue-and-amazon-s3/

【讨论】:

在使用 Athena 时,我正在使用 Parquet 来提高查询性能并降低查询成本。 感谢您的洞察力。有时问题比答案更能提供信息。 您提供的最后一个链接描述了我当前的过程——将单个表的数据转换为 Parquet。我正在为许多表寻找“最佳实践”或“易于管理”的方法。【参考方案3】:

我不是 Glue 的忠实粉丝,也不是从数据创建模式

这是在 Athena 中的操作方法,它比 Glue 快得多。

这是用于 CSV 文件的:

create table foo (
  id int,
  name string,
  some date
)
row format delimited
  fields terminated by ','
location 's3://mybucket/path/to/csvs/'

这是用于镶木地板文件的:

create table bar 
with (
  external_location = 's3://mybucket/path/to/parquet/',
  format = 'PARQUET'
)
as select * from foo 

您不需要为镶木地板创建该路径,即使您使用分区

【讨论】:

【参考方案4】:

您可以直接将 JSON 或 CSV 文件转换为 parquet,而无需先将其导入目录。

这是用于 JSON 文件的 - 下面的代码将转换托管在 rawFiles 目录中的任何内容

import sys
from awsglue.job import Job 
from awsglue.transforms import *
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions

## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
spark = glueContext.spark_session
job = Job(glueContext) job.init(args['JOB_NAME'], args)

s3_json_path = 's3://rawFiles/'  
s3_parquet_path = 's3://convertedFiles/'

output = spark.read.load(s3_json_path, format='json') 
output.write.parquet(s3_parquet_path)

job.commit()

【讨论】:

【参考方案5】:

听起来在您的第 1 步中,您正在抓取单个 csv 文件(例如 some-bucket/container-path/file.csv),但如果您将抓取工具设置为查看路径级别而不是文件级别(例如 some-bucket/container-path/) 并且您的所有 csv 文件都是统一的,那么爬虫应该只创建一个外部表而不是每个文件的外部表,您将能够从所有文件中提取数据一次。

【讨论】:

第 1 步中的爬虫设置为爬取文件夹路径。这将创建一个具有多个表的数据库。每个表的数据都存储为 CSV 文件。我正在尝试使用单个脚本或作业将所有这些 CSV 文件转换为 Parquet。换句话说,我想将给定数据库的所有 CSV 文件转换为 Parquet。 AWS Glue 只允许我为每个作业选择一个表。我正在寻找一种方法来有效地为多个表执行此操作。 @mark - ***.com/users/5504459/mark-s,我正在努力实现同样的目标。你有一个可行的解决方案吗? @nitinr708 我的解决方案可能已经过时(例如,pandas 现在应该可用于 Python shell Glue 作业)。基本方法是遍历所有 csv 文件,将每个文件读入数据帧,然后写入 parquet。 Pandas DF、Glue DynamicFrames 和 PySpark DF 是您的选择。每个都有不同的 API 用于读取/写入 DF。这些链接应该会有所帮助——胶水:docs.aws.amazon.com/glue/latest/dg/…。 PySpark:***.com/a/45873742/5504459。熊猫:***.com/a/37703861/5504459

以上是关于如何使用 AWS Glue 将许多 CSV 文件转换为 Parquet的主要内容,如果未能解决你的问题,请参考以下文章

AWS Glue 数据目录的头文件

AWS Glue 和重复数据删除增量 CSV 文件

AWS Glue - 从 sql server 表中读取并作为自定义 CSV 文件写入 S3

如何在 AWS Glue 中从 CSV 创建结构化 JSON

AWS Glue to Redshift:是否可以替换,更新或删除数据?

使用 Python 在 AWS Glue 中打开和读取文件