Spark - 30 分钟通用窗口化

Posted

技术标签:

【中文标题】Spark - 30 分钟通用窗口化【英文标题】:Spark - 30 Minute Generic Windowing 【发布时间】:2020-03-05 14:50:48 【问题描述】:

我目前正在编写一个 spark 脚本,以查看每 30 分钟的周期并确定该列在该 30 分钟滚动周期内的平均值。

我的时间戳格式为:MM/dd/yyyy HH:mm:ss AM/PM。本质上,我想做的是每 30 分钟查看一次,不包括日期。 (即下午 1:02 至下午 1:32 之间全天的平均乘客人数)。

我当前的脚本将获取我的时间戳,将其转换为 unix 时间戳并将其存储为新列。然后,查看当前时间戳,它会减去 900 秒,并加上 900 秒,以获取前 15 分钟的记录和当前时间戳之后 15 分钟的记录。这给了我正在寻找的 30 分钟窗口。当我在创建新列“时间戳”时包含MM/dd/yyyy 时,此方法有效:

val taxiSub = spark.read.format("csv").option("header", true).option("inferSchema", true).load("/user/zeppelin/taxi/taxi_subset.csv")
taxiSub.createOrReplaceTempView("taxiSub")
val stamp = taxiSub.withColumn("timestamp", unix_timestamp($"tpep_pickup_datetime", "MM/dd/yyyy HH:mm"))
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("VendorID").orderBy("timestamp").rangeBetween(-900,900)
val answer = stamp.withColumn("AvgPassenger", avg(stamp("passenger_count")).over(windowSpec))
answer.select("VendorID", "tpep_pickup_datetime", "timestamp", "passenger_count", "AvgPassenger")
answer.createOrReplaceTempView("answerTable")
spark.sqlContext.sql("SELECT timestamp, AvgPassenger FROM answerTable ORDER BY AvgPassenger DESC limit 10").show()

但是,这给了我范围内的具体日期,而不是上面提到的通用时间段。当我尝试从时间戳生成中删除MM/dd/yyyy 时,我的所有时间戳值都变为空。此外,如何计算时间戳的 AM/PM 部分?

任何想法将不胜感激。

【问题讨论】:

【参考方案1】:

我们可以使用 unix_timestamp("HH:mm","HH:mm") 来获取通用纪元时间值,然后在我们的 orderBy 子句中使用该值。

Example:

//import org.apache.spark.sql.expressions._

//sample data
//+--------+---------+---------------+--------------------+
//|VendorID|timestamp|passenger_count|tpep_pickup_datetime|
//+--------+---------+---------------+--------------------+
//|       1|    66180|              3|    12/12/2019 12:23|
//|       1|    66780|              2|    12/13/2018 12:33|
//|       2|    66180|             12|    12/13/2019 12:23|
//|       2|    69780|             13|    12/13/2018 13:23|
//+--------+---------+---------------+--------------------+

val stamp = taxiSub.withColumn("tmp",to_timestamp(col("tpep_pickup_datetime"),"MM/dd/yyyy HH:mm")).//add new timestamp type field
withColumn("timestamp", unix_timestamp(concat_ws(":",hour(col("tmp")),minute(col("tmp"))),"HH:mm")). //extract hour,minute and convert to epoch timestamp value
drop("tmp")

//partition based on vendorid
val windowSpec = Window.partitionBy("VendorID").orderBy("timestamp").rangeBetween(-900,900)

stamp.withColumn("AvgPassenger", avg(stamp("passenger_count")).over(windowSpec)).show()

//+--------+---------+---------------+--------------------+------------+
//|VendorID|timestamp|passenger_count|tpep_pickup_datetime|AvgPassenger|
//+--------+---------+---------------+--------------------+------------+
//|       1|    66180|              3|    12/12/2019 12:23|         2.5|
//|       1|    66780|              2|    12/13/2018 12:33|         2.5|
//|       2|    66180|             12|    12/13/2019 12:23|        12.0|
//|       2|    69780|             13|    12/13/2018 13:23|        13.0|
//+--------+---------+---------------+--------------------+------------+

【讨论】:

以上是关于Spark - 30 分钟通用窗口化的主要内容,如果未能解决你的问题,请参考以下文章

30分钟轻松入门flutter,通用流行框架大全

在窗口上聚合(总和)以获得列列表

OpenStack 通用设计思路 - 每天5分钟玩转 OpenStack(25)

OpenStack 通用设计思路 - 每天5分钟玩转 OpenStack(25)

JAVA通用GET和POST方法

Unity 3D 游戏通用系统设置页面,自定义按键设置,背景虚化,图像设置,亮度对比度饱和度音量调节,分辨率窗口化,帧率垂直同步,抗锯齿,阴影质量,纹理质量设置