使用 Spark 将 csv.gz 文件转换为 Parquet

Posted

技术标签:

【中文标题】使用 Spark 将 csv.gz 文件转换为 Parquet【英文标题】:Convert csv.gz files into Parquet using Spark 【发布时间】:2015-10-21 23:15:37 【问题描述】:

我需要使用 Spark(首选 Scala)将 AWS S3 和 HDFS 中的文件夹中的 csv.gz 文件转换为 Parquet 文件。数据的一列是时间戳,我只有一周的数据集。时间戳格式为:

'yyyy-MM-dd hh:mm:ss'

我想要的输出是,对于每一天,都有一个文件夹(或分区),该特定日期的 Parquet 文件所在的位置。所以会有 7 个输出文件夹或分区。

我对如何做到这一点只有一个模糊的想法,我的脑海中只有 sc.textFile。 Spark 中是否有可以转换为 Parquet 的函数?如何在 S3 和 HDFS 中实现这一点?

感谢您的帮助。

【问题讨论】:

你读过spark.apache.org/docs/latest/…吗?? 【参考方案1】:

如果您查看 Spark Dataframe API 和 Spark-CSV package,这将实现您尝试执行的大部分操作 - 将 CSV 文件读入数据帧,然后将数据帧作为镶木地板写入带你去那里的大部分路。

您仍然需要执行一些步骤来解析时间戳并使用结果对数据进行分区。

【讨论】:

第一个链接已经失效。最好只包含一个关于如何做的 sn-p。 第二个链接也过期了,这个功能是 Spark 2.x 内置的【参考方案2】:

老话题,但我认为如果没有正确回答,即使是老话题也很重要。

在 spark 版本中 >=2 csv 包已经包含在内,您需要将 databricks csv 包导入到您的工作中,例如“--packages com.databricks:spark-csv_2.10:1.5.0”。

示例 csv:

id,name,date
1,pete,2017-10-01 16:12
2,paul,2016-10-01 12:23
3,steve,2016-10-01 03:32
4,mary,2018-10-01 11:12 
5,ann,2018-10-02 22:12
6,rudy,2018-10-03 11:11
7,mike,2018-10-04 10:10

首先,您需要创建 hivetable,以便 spark 写入的数据与 hive 架构兼容。 (在未来的版本中可能不再需要)

创建表:

create table part_parq_table (
    id int,
    name string
    )
partitioned by (date string)
stored as parquet

完成此操作后,您可以轻松读取 csv 并将数据框保存到该表中。第二步用“yyyy-mm-dd”之类的日期格式覆盖列日期。将为每个值创建一个文件夹,其中包含特定的行。

SCALA Spark-Shell 示例:

spark.sqlContext.setConf("hive.exec.dynamic.partition", "true") 
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

前两行是 hive 配置,需要创建一个尚不存在的分区文件夹。

var df=spark.read.format("csv").option("header","true").load("/tmp/test.csv")
df=df.withColumn("date",substring(col("date"),0,10))
df.show(false)
df.write.format("parquet").mode("append").insertInto("part_parq_table")

插入完成后,您可以直接查询表,如“select * from part_parq_table”。 这些文件夹将在默认 cloudera 的 tablefolder 中创建,例如hdfs:///users/hive/warehouse/part_parq_table

希望有所帮助 BR

【讨论】:

【参考方案3】:

读取csv文件/user/hduser/wikipedia/pageviews-by-second-tsv

"timestamp"             "site"  "requests"
"2015-03-16T00:09:55"   "mobile"        1595
"2015-03-16T00:10:39"   "mobile"        1544

以下代码使用spark2.0

import org.apache.spark.sql.types._
var wikiPageViewsBySecondsSchema = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
var wikiPageViewsBySecondsDF = spark.read.schema(wikiPageViewsBySecondsSchema).option("header", "true").option("delimiter", "\t").csv("/user/hduser/wikipedia/pageviews-by-second-tsv")

将字符串时间戳转换为时间戳

wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.withColumn("timestampTS", $"timestamp".cast("timestamp")).drop("timestamp")
or 
wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.select($"timestamp".cast("timestamp"), $"site", $"requests") 

写入 parquet 文件。

wikiPageViewsBySecondsTableDF.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")

【讨论】:

以上是关于使用 Spark 将 csv.gz 文件转换为 Parquet的主要内容,如果未能解决你的问题,请参考以下文章

如何在R中使用sparklyr打开“GZ FILE”?

如何在 Copy commd 中使用反斜杠字符将 s3 csv gz 文件加载到 Redshift

将包含 Vector 作为特征的 Spark 数据帧转换为 CSV 文件

合并以 XXXXX.csv.gz_1_2.tar 和 XXXXX.csv.gz_2_2.tar 格式分块的两个文件(使用 python 或 pyspark)

将 spark 转换为 pandas 数据框有异常:使用基于文件的收集时不支持箭头

多核 gzip 解压缩,将输出文件 (csv) 拆分为 1Gb/文件