复制当前行或滞后行
Posted
技术标签:
【中文标题】复制当前行或滞后行【英文标题】:dupicating current or lagging row 【发布时间】:2020-10-13 14:59:20 【问题描述】:我有桌子 1
number type time
1 on 1
1 on 5
1 off 10
1 off 15
我需要把它转换成table2
number type time
1 on 1
1 off 2
1 on 5
1 off 10
1 on 14
1 off 15
条件是如果$"type" === "ON" && $"lag_type" =!= "OFF"
我复制了 laggig 行,但将 type 替换为 OFF 并将 time 增加一秒。如果$"type" =!= "ON" && $"lag_type" === "OFF"
我复制当前行,但将类型替换为 ON 并将时间减少一秒。如果滞后类型为空,则跳过。
我找到了应该复制哪些行
val window = Window.partitionBy($"number").orderBy($"time")
df
.withColumn("lag_type", lag($"type", 1, null).over(window))
.withColumn("lag1", $"type" === "ON" && $"lag_type" =!= "OFF")
.withColumn("lag2", $"type" =!= "ON" && $"lag_type" === "OFF")
但不知道如何添加行。特别是如果它们基于落后者的价值。对于当前的,我可能会列出清单
type time
(on, off) (14, 15)
并像显示 here 一样展示它们,但是当谈到滞后行时,我再次迷失了。
有什么建议吗?我正在使用火花 2.2。案例类可以吗?
【问题讨论】:
如果您发现它有用,请也给答案投票:) @Eugene 【参考方案1】:如果我正确理解您的问题,您只想在您发现 lag1
或 lag2
是 true
的情况下添加行。
一种方法:
//Filter only the rows that needs to be changed :
val df2 = df.withColumn("lag_type", lag($"type", 1, null).over(window))
.withColumn("lag1", $"type" === "ON" && $"lag_type" =!= "OFF")
.withColumn("lag2", $"type" =!= "ON" && $"lag_type" === "OFF")
.filter( $"lag1"|| $"lag2")
//Change the rows based on the values of lag1 and lag2
//then drop extra columns
val newChangedDf = df2
.withColumn("time",when($"lag1", $"time"+1).otherwise($"time"-1))
.withColumn("type",when($"lag1", lit("OFF")).otherwise(lit("ON")))
.drop("lag_type","lag1","lag2")
//Finally add them to the original df.
val finalDf = df.union(newChangedDf)
注意:这不处理 lag1 和 lag2 都是 true
的情况。请根据您的要求操作上述代码。
【讨论】:
以上是关于复制当前行或滞后行的主要内容,如果未能解决你的问题,请参考以下文章