使用 Spark 将 csv.gz 文件转换为 Parquet
Posted
技术标签:
【中文标题】使用 Spark 将 csv.gz 文件转换为 Parquet【英文标题】:Convert csv.gz files into Parquet using Spark 【发布时间】:2015-10-21 23:15:37 【问题描述】:我需要使用 Spark(首选 Scala)将 AWS S3 和 HDFS 中的文件夹中的 csv.gz 文件转换为 Parquet 文件。数据的一列是时间戳,我只有一周的数据集。时间戳格式为:
'yyyy-MM-dd hh:mm:ss'
我想要的输出是,对于每一天,都有一个文件夹(或分区),该特定日期的 Parquet 文件所在的位置。所以会有 7 个输出文件夹或分区。
我对如何做到这一点只有一个模糊的想法,我的脑海中只有 sc.textFile。 Spark 中是否有可以转换为 Parquet 的函数?如何在 S3 和 HDFS 中实现这一点?
感谢您的帮助。
【问题讨论】:
你读过spark.apache.org/docs/latest/…吗?? 【参考方案1】:如果您查看 Spark Dataframe API 和 Spark-CSV package,这将实现您尝试执行的大部分操作 - 将 CSV 文件读入数据帧,然后将数据帧作为镶木地板写入带你去那里的大部分路。
您仍然需要执行一些步骤来解析时间戳并使用结果对数据进行分区。
【讨论】:
第一个链接已经失效。最好只包含一个关于如何做的 sn-p。 第二个链接也过期了,这个功能是 Spark 2.x 内置的【参考方案2】:老话题,但我认为如果没有正确回答,即使是老话题也很重要。
在 spark 版本中 >=2 csv 包已经包含在内,您需要将 databricks csv 包导入到您的工作中,例如“--packages com.databricks:spark-csv_2.10:1.5.0”。
示例 csv:
id,name,date
1,pete,2017-10-01 16:12
2,paul,2016-10-01 12:23
3,steve,2016-10-01 03:32
4,mary,2018-10-01 11:12
5,ann,2018-10-02 22:12
6,rudy,2018-10-03 11:11
7,mike,2018-10-04 10:10
首先,您需要创建 hivetable,以便 spark 写入的数据与 hive 架构兼容。 (在未来的版本中可能不再需要)
创建表:
create table part_parq_table (
id int,
name string
)
partitioned by (date string)
stored as parquet
完成此操作后,您可以轻松读取 csv 并将数据框保存到该表中。第二步用“yyyy-mm-dd”之类的日期格式覆盖列日期。将为每个值创建一个文件夹,其中包含特定的行。
SCALA Spark-Shell 示例:
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
前两行是 hive 配置,需要创建一个尚不存在的分区文件夹。
var df=spark.read.format("csv").option("header","true").load("/tmp/test.csv")
df=df.withColumn("date",substring(col("date"),0,10))
df.show(false)
df.write.format("parquet").mode("append").insertInto("part_parq_table")
插入完成后,您可以直接查询表,如“select * from part_parq_table”。 这些文件夹将在默认 cloudera 的 tablefolder 中创建,例如hdfs:///users/hive/warehouse/part_parq_table
希望有所帮助 BR
【讨论】:
【参考方案3】:读取csv文件/user/hduser/wikipedia/pageviews-by-second-tsv
"timestamp" "site" "requests"
"2015-03-16T00:09:55" "mobile" 1595
"2015-03-16T00:10:39" "mobile" 1544
以下代码使用spark2.0
import org.apache.spark.sql.types._
var wikiPageViewsBySecondsSchema = StructType(Array(StructField("timestamp", StringType, true),StructField("site", StringType, true),StructField("requests", LongType, true) ))
var wikiPageViewsBySecondsDF = spark.read.schema(wikiPageViewsBySecondsSchema).option("header", "true").option("delimiter", "\t").csv("/user/hduser/wikipedia/pageviews-by-second-tsv")
将字符串时间戳转换为时间戳
wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.withColumn("timestampTS", $"timestamp".cast("timestamp")).drop("timestamp")
or
wikiPageViewsBySecondsDF= wikiPageViewsBySecondsDF.select($"timestamp".cast("timestamp"), $"site", $"requests")
写入 parquet 文件。
wikiPageViewsBySecondsTableDF.write.parquet("/user/hduser/wikipedia/pageviews-by-second-parquet")
【讨论】:
以上是关于使用 Spark 将 csv.gz 文件转换为 Parquet的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Copy commd 中使用反斜杠字符将 s3 csv gz 文件加载到 Redshift
将包含 Vector 作为特征的 Spark 数据帧转换为 CSV 文件
合并以 XXXXX.csv.gz_1_2.tar 和 XXXXX.csv.gz_2_2.tar 格式分块的两个文件(使用 python 或 pyspark)