每个窗口匹配条件的最后一个条目
Posted
技术标签:
【中文标题】每个窗口匹配条件的最后一个条目【英文标题】:Last Entry that matches a condition per Window 【发布时间】:2018-03-05 17:50:19 【问题描述】:此虚拟数据表示具有测量周期的设备。 一个测量周期从“类型”初始化到初始化。
我想知道的是 f.e.每个测量周期内的最后一个错误(条件会变得更加复杂)。
我已经找到了解决方案。我真正想知道的是是否有更简单/更有效的方法来计算。
示例数据集
val df_orig = spark.sparkContext.parallelize(Seq(
("Init", 1, 17, "I"),
("TypeA", 2, 17, "W"),
("TypeA", 3, 17, "E"),
("TypeA", 4, 17, "W"),
("TypeA", 5, 17, "E"),
("TypeA", 6, 17, "W"),
("Init", 7, 12, "I"),
("TypeB", 8, 12, "W"),
("TypeB", 9, 12, "E"),
("TypeB", 10, 12, "W"),
("TypeB", 11, 12, "W"),
("TypeB", 12, 12, "E"),
("TypeB", 13, 12, "E")
)).toDF("Type", "rn", "X_ChannelC", "Error_Type")
以下代码代表我的解决方案。
val fillWindow = Window.partitionBy().orderBy($"rn").rowsBetween(Window.unboundedPreceding, 0)
//create window
val df_with_window = df_orig.withColumn("window_flag", when($"Type".contains("Init"), 1).otherwise(null))
.withColumn("window_filled", sum($"window_flag").over(fillWindow))
val window = Window.partitionBy("window_filled").orderBy($"rn").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
//calulate last entry
val df_new = df_with_window.withColumn("is_relevant", when($"Error_Type".contains("E"), $"rn").otherwise(null))
.withColumn("last", last($"is_relevant", true).over(window))
.withColumn("pass", when($"last" === $"is_relevant", "Fail").otherwise(null))
df_new.show()
结果:
+-----+---+----------+----------+-----------+-------------+-----------+----+--------+
| Type| rn|X_ChannelC|Error_Type|window_flag|window_filled|is_relevant|last| pass|
+-----+---+----------+----------+-----------+-------------+-----------+----+--------+
| Init| 1| 17| I| 1| 1| null| 5| null|
|TypeA| 2| 17| W| null| 1| null| 5| null|
|TypeA| 3| 17| E| null| 1| 3| 5| null|
|TypeA| 4| 17| W| null| 1| null| 5| null|
|TypeA| 5| 17| E| null| 1| 5| 5|This one|
|TypeA| 6| 17| W| null| 1| null| 5| null|
| Init| 7| 12| I| 1| 2| null| 13| null|
|TypeB| 8| 12| W| null| 2| null| 13| null|
|TypeB| 9| 12| E| null| 2| 9| 13| null|
|TypeB| 10| 12| W| null| 2| null| 13| null|
|TypeB| 11| 12| W| null| 2| null| 13| null|
|TypeB| 12| 12| E| null| 2| 12| 13| null|
|TypeB| 13| 12| E| null| 2| 13| 13|This one|
+-----+---+----------+----------+-----------+-------------+-----------+----+--------+
【问题讨论】:
我的回答对你有帮助吗,如果有请采纳 它并没有真正帮助我,因为它是相同的答案,只是代码更短。物理计划的工作方式相同。两种解决方案都使用了两个窗口函数 【参考方案1】:不确定这是否更有效(仍然使用 2 个窗口函数,但要短一些):
val df_new = df_orig
.withColumn("measurement", sum(when($"Type"==="Init",1)).over(Window.orderBy($"rn")))
.withColumn("pass", $"rn"===max(when($"Error_Type"==="E",$"rn")).over(Window.partitionBy($"measurement")))
.show()
+-----+---+----------+----------+-----------+-----+
| Type| rn|X_ChannelC|Error_Type|measurement| pass|
+-----+---+----------+----------+-----------+-----+
| Init| 1| 17| I| 1|false|
|TypeA| 2| 17| W| 1|false|
|TypeA| 3| 17| E| 1|false|
|TypeA| 4| 17| W| 1|false|
|TypeA| 5| 17| E| 1| true|
|TypeA| 6| 17| W| 1|false|
| Init| 7| 12| I| 2|false|
|TypeB| 8| 12| W| 2|false|
|TypeB| 9| 12| E| 2|false|
|TypeB| 10| 12| W| 2|false|
|TypeB| 11| 12| W| 2|false|
|TypeB| 12| 12| E| 2|false|
|TypeB| 13| 12| E| 2| true|
+-----+---+----------+----------+-----------+-----+
【讨论】:
以上是关于每个窗口匹配条件的最后一个条目的主要内容,如果未能解决你的问题,请参考以下文章