如何使用 AWS Glue 将许多 CSV 文件转换为 Parquet
Posted
技术标签:
【中文标题】如何使用 AWS Glue 将许多 CSV 文件转换为 Parquet【英文标题】:How to Convert Many CSV files to Parquet using AWS Glue 【发布时间】:2018-10-03 19:16:46 【问题描述】:我正在使用 AWS S3、Glue 和 Athena,设置如下:
S3 --> 胶水 --> 雅典娜
我的原始数据以 CSV 文件的形式存储在 S3 上。我正在使用 Glue 进行 ETL,并且我正在使用 Athena 来查询数据。
由于我使用的是 Athena,因此我想将 CSV 文件转换为 Parquet。我现在正在使用 AWS Glue 来执行此操作。这是我正在使用的当前流程:
-
运行 Crawler 以读取 CSV 文件并填充数据目录。
运行 ETL 作业以从数据目录创建 Parquet 文件。
运行爬虫以使用 Parquet 文件填充数据目录。
Glue 作业一次只允许我转换一个表。如果我有很多 CSV 文件,这个过程很快就会变得难以管理。有没有更好的方法,也许是“正确”的方法,使用 AWS Glue 或其他一些 AWS 服务将 许多 CSV 文件转换为 Parquet?
【问题讨论】:
【参考方案1】:我遇到了完全相同的情况,我想有效地循环遍历由爬虫编目的目录表,这些目录表指向 csv 文件,然后将它们转换为镶木地板。不幸的是,网络上还没有太多可用的信息。这就是为什么我在LinkedIn 中写了一篇博客来解释我是如何做到的。请阅读;特别是第 5 点。希望有帮助。请让我知道您的反馈。
注意:根据 Antti 的反馈,我从下面的博客中粘贴了摘录的解决方案:
-
遍历目录/数据库/表
作业向导带有在数据源上运行预定义脚本的选项。问题是您可以选择的数据源是目录中的单个表。它没有让您选择在整个数据库或一组表上运行作业。无论如何,您可以稍后修改脚本,但是在胶水目录中迭代数据库表的方法也很难找到。有目录 API,但缺少合适的示例。 github 示例 repo 可以丰富更多场景以帮助开发人员。
经过一番折腾,我想出了下面的脚本来完成这项工作。我使用 boto3 客户端循环遍历表。如果涉及到某人的帮助,我将其粘贴在这里。如果您有更好的建议,我也想听听您的意见
import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
client = boto3.client('glue', region_name='ap-southeast-2')
databaseName = 'tpc-ds-csv'
print '\ndatabaseName: ' + databaseName
Tables = client.get_tables(DatabaseName=databaseName)
tableList = Tables['TableList']
for table in tableList:
tableName = table['Name']
print '\n-- tableName: ' + tableName
datasource0 = glueContext.create_dynamic_frame.from_catalog(
database="tpc-ds-csv",
table_name=tableName,
transformation_ctx="datasource0"
)
datasink4 = glueContext.write_dynamic_frame.from_options(
frame=datasource0,
connection_type="s3",
connection_options=
"path": "s3://aws-glue-tpcds-parquet/"+ tableName + "/"
,
format="parquet",
transformation_ctx="datasink4"
)
job.commit()
【讨论】:
我很久以前就解决了这个问题。您链接的博客文章中提到的解决方案与我最终所做的几乎相同。我希望 AWS 能够及时更新他们的 Glue 文档。目前严重缺乏。 这不是一个答案,除非您实际上至少提供了答案本身的细节的一瞥。 你说得对,安蒂。那时我是新的贡献者,还在学习。我已经用实际的解决方案编辑了答案【参考方案2】:请参阅编辑以获取更新信息。
S3 --> 雅典娜
为什么不直接在 Athena 中使用 CSV 格式?
https://docs.aws.amazon.com/athena/latest/ug/supported-format.html
CSV 是受支持的格式之一。此外,为了提高效率,您可以压缩多个 CSV 文件以加快加载速度。
支持的压缩,
https://docs.aws.amazon.com/athena/latest/ug/compression-formats.html
希望对你有帮助。
编辑:
为什么 Parquet 格式比 CSV 更有用?
https://dzone.com/articles/how-to-be-a-hero-with-powerful-parquet-google-and
S3 --> 胶水 --> 雅典娜
有关 CSV 到 Parquet 转换的更多详细信息,
https://aws.amazon.com/blogs/big-data/build-a-data-lake-foundation-with-aws-glue-and-amazon-s3/
【讨论】:
在使用 Athena 时,我正在使用 Parquet 来提高查询性能并降低查询成本。 感谢您的洞察力。有时问题比答案更能提供信息。 您提供的最后一个链接描述了我当前的过程——将单个表的数据转换为 Parquet。我正在为许多表寻找“最佳实践”或“易于管理”的方法。【参考方案3】:我不是 Glue 的忠实粉丝,也不是从数据创建模式
这是在 Athena 中的操作方法,它比 Glue 快得多。
这是用于 CSV 文件的:
create table foo (
id int,
name string,
some date
)
row format delimited
fields terminated by ','
location 's3://mybucket/path/to/csvs/'
这是用于镶木地板文件的:
create table bar
with (
external_location = 's3://mybucket/path/to/parquet/',
format = 'PARQUET'
)
as select * from foo
您不需要为镶木地板创建该路径,即使您使用分区
【讨论】:
【参考方案4】:您可以直接将 JSON 或 CSV 文件转换为 parquet,而无需先将其导入目录。
这是用于 JSON 文件的 - 下面的代码将转换托管在 rawFiles 目录中的任何内容
import sys
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions
## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
spark = glueContext.spark_session
job = Job(glueContext) job.init(args['JOB_NAME'], args)
s3_json_path = 's3://rawFiles/'
s3_parquet_path = 's3://convertedFiles/'
output = spark.read.load(s3_json_path, format='json')
output.write.parquet(s3_parquet_path)
job.commit()
【讨论】:
【参考方案5】:听起来在您的第 1 步中,您正在抓取单个 csv 文件(例如 some-bucket/container-path/file.csv),但如果您将抓取工具设置为查看路径级别而不是文件级别(例如 some-bucket/container-path/) 并且您的所有 csv 文件都是统一的,那么爬虫应该只创建一个外部表而不是每个文件的外部表,您将能够从所有文件中提取数据一次。
【讨论】:
第 1 步中的爬虫设置为爬取文件夹路径。这将创建一个具有多个表的数据库。每个表的数据都存储为 CSV 文件。我正在尝试使用单个脚本或作业将所有这些 CSV 文件转换为 Parquet。换句话说,我想将给定数据库的所有 CSV 文件转换为 Parquet。 AWS Glue 只允许我为每个作业选择一个表。我正在寻找一种方法来有效地为多个表执行此操作。 @mark - ***.com/users/5504459/mark-s,我正在努力实现同样的目标。你有一个可行的解决方案吗? @nitinr708 我的解决方案可能已经过时(例如,pandas
现在应该可用于 Python shell Glue 作业)。基本方法是遍历所有 csv 文件,将每个文件读入数据帧,然后写入 parquet。 Pandas DF、Glue DynamicFrames 和 PySpark DF 是您的选择。每个都有不同的 API 用于读取/写入 DF。这些链接应该会有所帮助——胶水:docs.aws.amazon.com/glue/latest/dg/…。 PySpark:***.com/a/45873742/5504459。熊猫:***.com/a/37703861/5504459以上是关于如何使用 AWS Glue 将许多 CSV 文件转换为 Parquet的主要内容,如果未能解决你的问题,请参考以下文章
AWS Glue - 从 sql server 表中读取并作为自定义 CSV 文件写入 S3
如何在 AWS Glue 中从 CSV 创建结构化 JSON