按时间戳写入分区数据

Posted

技术标签:

【中文标题】按时间戳写入分区数据【英文标题】:spark partition data writing by timestamp 【发布时间】:2018-09-27 00:49:57 【问题描述】:

我有一些数据的时间戳列字段很长并且它的纪元标准,我需要使用 spark scala 以 yyyy/mm/dd/hh 等拆分格式保存该数据

data.write.partitionBy("timestamp").format("orc").save("mypath") 

这只是按时间戳拆分数据,如下所示

timestamp=1458444061098
timestamp=1458444061198

但我希望它是这样的

└── YYYY
    └── MM
        └── DD
            └── HH

【问题讨论】:

【参考方案1】:

您可以为此利用各种 spark sql 日期/时间函数。首先,添加一个从 unix 时间戳列创建的新日期类型列。

val withDateCol = data
.withColumn("date_col", from_unixtime(col("timestamp"), "YYYYMMddHH"))

之后,您可以将年、月、日和小时列添加到 DF,然后按这些新列进行分区以进行写入。

withDateCol
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.withColumn("hour", hour(col("date_col")))
.drop("date_col")
.partitionBy("year", "month", "day", "hour")
.format("orc")
.save("mypath") 

partitionBy 子句中包含的列不会成为文件架构的一部分。

【讨论】:

【参考方案2】:

首先,我会警告您过度分区。也就是说,确保您有足够的数据使其值得按小时进行分区,否则您最终可能会得到许多带有小文件的分区文件夹。我要提醒的第二个注意事项是使用分区层次结构(年/月/日/小时),因为它需要递归分区发现。

话虽如此,如果您确实想按小时分段进行分区,我建议您将时间戳截断为小时,并按此进行分区。然后,Spark 将足够智能,在您读回该格式时将其识别为时间戳,并且您实际上可以根据需要执行完全过滤。

input
  .withColumn("ts_trunc", date_trunc("HOUR", 'timestamp)) // date_trunc added in Spark 2.3.0
  .write
  .partitionBy("ts_trunc")
  .save("/mnt/warehouse/part-test")

spark.read.load("/mnt/warehouse/part-test").where("hour(ts_trunc) = 10")

另一种选择是按日期和小时进行分区:

input
  .withColumn("date", to_date('timestamp))
  .withColumn("hour", hour('timestamp))
  .write
  .partitionBy("date", "hour")
  .save("/mnt/warehouse/part-test")

【讨论】:

不幸的是它是这样创建的:year=2020/month=5/day=10 而不是2020/05/10 是的,这就是分区文件夹的工作方式,也是您想要的 @Ikrom 你有没有找到像 2020/05/10 和不像 year=2020/month=5/day=10 这样创建文件夹的东西

以上是关于按时间戳写入分区数据的主要内容,如果未能解决你的问题,请参考以下文章

Spark Sql:从时间戳按小时间隔分区

MySQL 分区,因为用户和时间戳日期范围

附加到 parquet 文件,由具有重叠时间戳的数据分区

对时间戳查询的数据进行分区

创建按一/多列分区的 BigQuery 外部表

时间戳范围之间的 AWS Appsync 订阅过滤器