在 scala spark 数据框中提取时间间隔

Posted

技术标签:

【中文标题】在 scala spark 数据框中提取时间间隔【英文标题】:Extract time intervals in a scala spark dataframe 【发布时间】:2019-03-08 16:34:02 【问题描述】:

我正在尝试根据 scala 和 spark 中的时间序列提取组合数据间隔

我在数据框中有以下数据:

Id | State | StartTime           | EndTime
---+-------+---------------------+--------------------
 1 |   R   | 2019-01-01T03:00:00 | 2019-01-01T11:30:00
 1 |   R   | 2019-01-01T11:30:00 | 2019-01-01T15:00:00
 1 |   R   | 2019-01-01T15:00:00 | 2019-01-01T22:00:00
 1 |   W   | 2019-01-01T22:00:00 | 2019-01-02T04:30:00
 1 |   W   | 2019-01-02T04:30:00 | 2019-01-02T13:45:00
 1 |   R   | 2019-01-02T13:45:00 | 2019-01-02T18:30:00
 1 |   R   | 2019-01-02T18:30:00 | 2019-01-02T22:45:00

我需要根据 id 和 state 将数据提取到时间间隔中。生成的数据需要如下所示:

Id | State | StartTime           | EndTime
---+-------+---------------------+--------------------
 1 |   R   | 2019-01-01T03:00:00 | 2019-01-01T22:00:00
 1 |   W   | 2019-01-01T22:00:00 | 2019-01-02T13:45:00
 1 |   R   | 2019-01-02T13:45:00 | 2019-01-02T22:45:00

请注意,前三个记录已分组在一起,因为设备从 2019-01-01T03:00:00 到 2019-01-01T22:00:00 连续处于 R 状态,然后切换到 W 状态接下来的两条记录从 2019-01-01T22:00:00 到 2019-01-02T13:45:00,然后返回到最后两条记录的 R 状态。

【问题讨论】:

Spark SQL window function with complex condition的可能重复 我看了那个问题,这是一个非常不同的问题 在这种情况下可以编辑您的问题并详细解释所需的逻辑吗?此外,我们始终欢迎 reproducible example 加入 apache-spark。提前谢谢你。 原来的解决方案是这样的:***.com/questions/7420618/… 翻译成 spark 嗨@JeffHornby,您是否设法将其转换为 Spark 代码? 【参考方案1】:

所以事实证明,这个问题的答案是 Combine rows when the end time of one is the start time of another (Oracle) 翻译成 Spark。

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col,row_number
import spark.implicits._

val idSpec = Window.partitionBy('Id).orderBy('StartTime)
val idStateSpec = Window.partitionBy('Id,'State).orderBy('StartTime)
val df2 = df
  .select('Id,'State,'StartTime,'EndTime,
          row_number().over(idSpec).as("idRowNumber"),
          row_number().over(idStateSpec).as("idStateRowNumber"))
  .groupBy('Id,'State,'idRowNumber - 'idStateRowNumber)
  .agg(min('StartTime).as("StartTime"), max('EndTime).as("EndTime"))

【讨论】:

以上是关于在 scala spark 数据框中提取时间间隔的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Scala/Spark 的数据框中扩展数组 [重复]

Scala(Spark)连接数据框中的列[重复]

如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件

如何在spark scala数据框中更新嵌套列的xml值

将新行与spark scala中数据框中的前一行数据合并

如何访问存储在scala spark中的数据框中的映射值和键