如何在 Spark 流数据框中获取列的滞后?
Posted
技术标签:
【中文标题】如何在 Spark 流数据框中获取列的滞后?【英文标题】:How to get the lag of a column in a Spark streaming dataframe? 【发布时间】:2017-08-08 21:17:36 【问题描述】:我的 Spark Scala 应用程序中有这种格式的数据流
id mark1 mark2 mark3 time
uuid1 100 200 300 Tue Aug 8 14:06:02 PDT 2017
uuid1 100 200 300 Tue Aug 8 14:06:22 PDT 2017
uuid2 150 250 350 Tue Aug 8 14:06:32 PDT 2017
uuid2 150 250 350 Tue Aug 8 14:06:52 PDT 2017
uuid2 150 250 350 Tue Aug 8 14:06:58 PDT 2017
我已将它读入列 id、mark1、mark2、mark3 和时间。时间也被转换为日期时间格式。 我想按 id 分组并获得 mark1 的滞后,它给出了前一行的 mark1 值。 像这样的:
id mark1 mark2 mark3 prev_mark time
uuid1 100 200 300 null Tue Aug 8 14:06:02 PDT 2017
uuid1 100 200 300 100 Tue Aug 8 14:06:22 PDT 2017
uuid2 150 250 350 null Tue Aug 8 14:06:32 PDT 2017
uuid2 150 250 350 150 Tue Aug 8 14:06:52 PDT 2017
uuid2 150 250 350 150 Tue Aug 8 14:06:58 PDT 2017
将数据框视为 markDF。我试过了:
val window = Window.partitionBy("uuid").orderBy("timestamp")
val newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))`
这表示非时间窗口不能应用于流式传输/附加数据集/帧。
我也试过了:
val window = Window.partitionBy("uuid").orderBy("timestamp").rowsBetween(-10, 10)
val newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))
为几行获得一个窗口也不起作用。流窗口类似于:
window("timestamp", "10 minutes")
不能用于发送延迟。我对如何做到这一点感到非常困惑。任何帮助都会很棒!
【问题讨论】:
每批次或整个流数据都做“滞后”吗? 【参考方案1】:我建议您将time
列更改为String
+-----+-----+-----+-----+----------------------------+
|id |mark1|mark2|mark3|time |
+-----+-----+-----+-----+----------------------------+
|uuid1|100 |200 |300 |Tue Aug 8 14:06:02 PDT 2017|
|uuid1|100 |200 |300 |Tue Aug 8 14:06:22 PDT 2017|
|uuid2|150 |250 |350 |Tue Aug 8 14:06:32 PDT 2017|
|uuid2|150 |250 |350 |Tue Aug 8 14:06:52 PDT 2017|
|uuid2|150 |250 |350 |Tue Aug 8 14:06:58 PDT 2017|
+-----+-----+-----+-----+----------------------------+
root
|-- id: string (nullable = true)
|-- mark1: integer (nullable = false)
|-- mark2: integer (nullable = false)
|-- mark3: integer (nullable = false)
|-- time: string (nullable = true)
之后执行以下操作应该可以工作
df.withColumn("prev_mark", lag("mark1", 1).over(Window.partitionBy("id").orderBy("time")))
这会给你输出
+-----+-----+-----+-----+----------------------------+---------+
|id |mark1|mark2|mark3|time |prev_mark|
+-----+-----+-----+-----+----------------------------+---------+
|uuid1|100 |200 |300 |Tue Aug 8 14:06:02 PDT 2017|null |
|uuid1|100 |200 |300 |Tue Aug 8 14:06:22 PDT 2017|100 |
|uuid2|150 |250 |350 |Tue Aug 8 14:06:32 PDT 2017|null |
|uuid2|150 |250 |350 |Tue Aug 8 14:06:52 PDT 2017|150 |
|uuid2|150 |250 |350 |Tue Aug 8 14:06:58 PDT 2017|150 |
+-----+-----+-----+-----+----------------------------+---------+
【讨论】:
这不起作用,因为我的数据帧是流数据帧 你为什么这么认为? over 函数不是基于时间的窗口 这就是为什么我说要变成字符串。 :) 我认为您没有正确阅读我的答案 @RameshMaharjan 我无法用您的解决方案解决问题。你确定是在流媒体 df 上这样做的吗?以上是关于如何在 Spark 流数据框中获取列的滞后?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Scala/Spark 添加不基于数据框中现有列的新列? [复制]
如何使用Scala计算Spark中数据框中列的开始索引和结束索引之间的行的平均值?
如何在 Databricks 中读取批量 excel 文件数据并加载到 spark 数据框中