如何在 Spark 流数据框中获取列的滞后?

Posted

技术标签:

【中文标题】如何在 Spark 流数据框中获取列的滞后?【英文标题】:How to get the lag of a column in a Spark streaming dataframe? 【发布时间】:2017-08-08 21:17:36 【问题描述】:

我的 Spark Scala 应用程序中有这种格式的数据流

id    mark1 mark2 mark3 time
uuid1 100   200   300   Tue Aug  8 14:06:02 PDT 2017
uuid1 100   200   300   Tue Aug  8 14:06:22 PDT 2017
uuid2 150   250   350   Tue Aug  8 14:06:32 PDT 2017
uuid2 150   250   350   Tue Aug  8 14:06:52 PDT 2017
uuid2 150   250   350   Tue Aug  8 14:06:58 PDT 2017

我已将它读入列 id、mark1、mark2、mark3 和时间。时间也被转换为日期时间格式。 我想按 id 分组并获得 mark1 的滞后,它给出了前一行的 mark1 值。 像这样的:

id    mark1 mark2 mark3 prev_mark time
uuid1 100   200   300   null      Tue Aug  8 14:06:02 PDT 2017
uuid1 100   200   300   100       Tue Aug  8 14:06:22 PDT 2017
uuid2 150   250   350   null      Tue Aug  8 14:06:32 PDT 2017
uuid2 150   250   350   150       Tue Aug  8 14:06:52 PDT 2017
uuid2 150   250   350   150       Tue Aug  8 14:06:58 PDT 2017

将数据框视为 markDF。我试过了:

val window = Window.partitionBy("uuid").orderBy("timestamp")
val newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))`

这表示非时间窗口不能应用于流式传输/附加数据集/帧。

我也试过了:

val window = Window.partitionBy("uuid").orderBy("timestamp").rowsBetween(-10, 10)
val newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))

为几行获得一个窗口也不起作用。流窗口类似于: window("timestamp", "10 minutes") 不能用于发送延迟。我对如何做到这一点感到非常困惑。任何帮助都会很棒!

【问题讨论】:

每批次或整个流数据都做“滞后”吗? 【参考方案1】:

我建议您将time 列更改为String

+-----+-----+-----+-----+----------------------------+
|id   |mark1|mark2|mark3|time                        |
+-----+-----+-----+-----+----------------------------+
|uuid1|100  |200  |300  |Tue Aug  8 14:06:02 PDT 2017|
|uuid1|100  |200  |300  |Tue Aug  8 14:06:22 PDT 2017|
|uuid2|150  |250  |350  |Tue Aug  8 14:06:32 PDT 2017|
|uuid2|150  |250  |350  |Tue Aug  8 14:06:52 PDT 2017|
|uuid2|150  |250  |350  |Tue Aug  8 14:06:58 PDT 2017|
+-----+-----+-----+-----+----------------------------+

root
 |-- id: string (nullable = true)
 |-- mark1: integer (nullable = false)
 |-- mark2: integer (nullable = false)
 |-- mark3: integer (nullable = false)
 |-- time: string (nullable = true)

之后执行以下操作应该可以工作

df.withColumn("prev_mark", lag("mark1", 1).over(Window.partitionBy("id").orderBy("time")))

这会给你输出

+-----+-----+-----+-----+----------------------------+---------+
|id   |mark1|mark2|mark3|time                        |prev_mark|
+-----+-----+-----+-----+----------------------------+---------+
|uuid1|100  |200  |300  |Tue Aug  8 14:06:02 PDT 2017|null     |
|uuid1|100  |200  |300  |Tue Aug  8 14:06:22 PDT 2017|100      |
|uuid2|150  |250  |350  |Tue Aug  8 14:06:32 PDT 2017|null     |
|uuid2|150  |250  |350  |Tue Aug  8 14:06:52 PDT 2017|150      |
|uuid2|150  |250  |350  |Tue Aug  8 14:06:58 PDT 2017|150      |
+-----+-----+-----+-----+----------------------------+---------+

【讨论】:

这不起作用,因为我的数据帧是流数据帧 你为什么这么认为? over 函数不是基于时间的窗口 这就是为什么我说要变成字符串。 :) 我认为您没有正确阅读我的答案 @RameshMaharjan 我无法用您的解决方案解决问题。你确定是在流媒体 df 上这样做的吗?

以上是关于如何在 Spark 流数据框中获取列的滞后?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Scala/Spark 添加不基于数据框中现有列的新列? [复制]

如何使用Scala计算Spark中数据框中列的开始索引和结束索引之间的行的平均值?

如何在 Databricks 中读取批量 excel 文件数据并加载到 spark 数据框中

PYSPARK:如何在 pyspark 数据框中找到两列的余弦相似度?

如何获得每列的最大值?

如何在整个 Pandas 数据框中搜索字符串并获取包含它的列的名称?