按时间戳写入分区数据
Posted
技术标签:
【中文标题】按时间戳写入分区数据【英文标题】:spark partition data writing by timestamp 【发布时间】:2018-09-27 00:49:57 【问题描述】:我有一些数据的时间戳列字段很长并且它的纪元标准,我需要使用 spark scala 以 yyyy/mm/dd/hh 等拆分格式保存该数据
data.write.partitionBy("timestamp").format("orc").save("mypath")
这只是按时间戳拆分数据,如下所示
timestamp=1458444061098
timestamp=1458444061198
但我希望它是这样的
└── YYYY
└── MM
└── DD
└── HH
【问题讨论】:
【参考方案1】:您可以为此利用各种 spark sql 日期/时间函数。首先,添加一个从 unix 时间戳列创建的新日期类型列。
val withDateCol = data
.withColumn("date_col", from_unixtime(col("timestamp"), "YYYYMMddHH"))
之后,您可以将年、月、日和小时列添加到 DF,然后按这些新列进行分区以进行写入。
withDateCol
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.withColumn("hour", hour(col("date_col")))
.drop("date_col")
.partitionBy("year", "month", "day", "hour")
.format("orc")
.save("mypath")
partitionBy 子句中包含的列不会成为文件架构的一部分。
【讨论】:
【参考方案2】:首先,我会警告您过度分区。也就是说,确保您有足够的数据使其值得按小时进行分区,否则您最终可能会得到许多带有小文件的分区文件夹。我要提醒的第二个注意事项是使用分区层次结构(年/月/日/小时),因为它需要递归分区发现。
话虽如此,如果您确实想按小时分段进行分区,我建议您将时间戳截断为小时,并按此进行分区。然后,Spark 将足够智能,在您读回该格式时将其识别为时间戳,并且您实际上可以根据需要执行完全过滤。
input
.withColumn("ts_trunc", date_trunc("HOUR", 'timestamp)) // date_trunc added in Spark 2.3.0
.write
.partitionBy("ts_trunc")
.save("/mnt/warehouse/part-test")
spark.read.load("/mnt/warehouse/part-test").where("hour(ts_trunc) = 10")
另一种选择是按日期和小时进行分区:
input
.withColumn("date", to_date('timestamp))
.withColumn("hour", hour('timestamp))
.write
.partitionBy("date", "hour")
.save("/mnt/warehouse/part-test")
【讨论】:
不幸的是它是这样创建的:year=2020/month=5/day=10
而不是2020/05/10
是的,这就是分区文件夹的工作方式,也是您想要的
@Ikrom 你有没有找到像 2020/05/10 和不像 year=2020/month=5/day=10 这样创建文件夹的东西以上是关于按时间戳写入分区数据的主要内容,如果未能解决你的问题,请参考以下文章