根据 Spark 中的小时数拆分时间戳间隔
Posted
技术标签:
【中文标题】根据 Spark 中的小时数拆分时间戳间隔【英文标题】:Split the timestamp interval based on hours in spark 【发布时间】:2020-08-12 05:48:34 【问题描述】:在 spark 中根据小时拆分时间戳
1,2019-04-01 04:00:21,12
1,2019-04-01 06:01:22,34
1,2019-04-01 09:21:23,10
1,2019-04-01 11:23:09,15
1,2019-04-01 12:02:10,15
1,2019-04-01 15:00:21,10
1,2019-04-01 18:00:22,10
1,2019-04-01 19:30:22,30
1,2019-04-01 20:22:30,30
1,2019-04-01 22:20:30,30
1,2019-04-01 23:59:00,10
将基于小时的时间戳按每 6 小时拆分为一天中的 4 个部分并求和。 在这里,我像 0-6AM、6AM-12PM 等一样分开。
1,2019-04-01,12
1,2019-04-01,59
1,2019-04-01,25
1,2019-04-01,110
【问题讨论】:
这能回答你的问题吗? How to group by time interval in Spark SQL @Lamanus:感谢您的回复,我尝试了该解决方案,但无法按预期获得输出。 【参考方案1】:试试这个-
加载测试数据
spark.conf.set("spark.sql.session.timeZone", "UTC")
val data =
"""
|c1,c2,c3
|1,2019-04-01 04:00:21,12
|1,2019-04-01 06:01:22,34
|1,2019-04-01 09:21:23,10
|1,2019-04-01 11:23:09,15
|1,2019-04-01 12:02:10,15
|1,2019-04-01 15:00:21,10
|1,2019-04-01 18:00:22,10
|1,2019-04-01 19:30:22,30
|1,2019-04-01 20:22:30,30
|1,2019-04-01 22:20:30,30
|1,2019-04-01 23:59:00,10
""".stripMargin
val stringDS2 = data.split(System.lineSeparator())
.map(_.split("\\,").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
.toSeq.toDS()
val df2 = spark.read
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.option("nullValue", "null")
.csv(stringDS2)
df2.show(false)
df2.printSchema()
/**
* +---+-------------------+---+
* |c1 |c2 |c3 |
* +---+-------------------+---+
* |1 |2019-03-31 22:30:21|12 |
* |1 |2019-04-01 00:31:22|34 |
* |1 |2019-04-01 03:51:23|10 |
* |1 |2019-04-01 05:53:09|15 |
* |1 |2019-04-01 06:32:10|15 |
* |1 |2019-04-01 09:30:21|10 |
* |1 |2019-04-01 12:30:22|10 |
* |1 |2019-04-01 14:00:22|30 |
* |1 |2019-04-01 14:52:30|30 |
* |1 |2019-04-01 16:50:30|30 |
* |1 |2019-04-01 18:29:00|10 |
* +---+-------------------+---+
*
* root
* |-- c1: integer (nullable = true)
* |-- c2: timestamp (nullable = true)
* |-- c3: integer (nullable = true)
*/
截断6 hrs
范围内的日期,然后截断groupBy().sum
val seconds = 21600 // 6 hrs
df2.withColumn("c2_long", expr(s"floor(cast(c2 as long) / $seconds) * $seconds"))
.groupBy("c1", "c2_long")
.agg(sum($"c3").as("c3"))
.withColumn("c2", to_date(to_timestamp($"c2_long")))
.withColumn("c2_time", to_timestamp($"c2_long"))
.orderBy("c2")
.show(false)
/**
* +---+----------+---+----------+-------------------+
* |c1 |c2_long |c3 |c2 |c2_time |
* +---+----------+---+----------+-------------------+
* |1 |1554055200|12 |2019-03-31|2019-03-31 18:00:00|
* |1 |1554120000|100|2019-04-01|2019-04-01 12:00:00|
* |1 |1554076800|59 |2019-04-01|2019-04-01 00:00:00|
* |1 |1554141600|10 |2019-04-01|2019-04-01 18:00:00|
* |1 |1554098400|25 |2019-04-01|2019-04-01 06:00:00|
* +---+----------+---+----------+-------------------+
*/
【讨论】:
【参考方案2】:SCALA:我评论的帖子中的答案效果很好。
df.groupBy($"id", window($"time", "6 hours").as("time"))
.agg(sum("count").as("count"))
.orderBy("time.start")
.select($"id", to_date($"time.start").as("time"), $"count")
.show(false)
+---+----------+-----+
|id |time |count|
+---+----------+-----+
|1 |2019-04-01|12 |
|1 |2019-04-01|59 |
|1 |2019-04-01|25 |
|1 |2019-04-01|110 |
+---+----------+-----+
【讨论】:
以上是关于根据 Spark 中的小时数拆分时间戳间隔的主要内容,如果未能解决你的问题,请参考以下文章
Spark - HiveContext |错误的时间戳(减去 4 小时)