每个窗口匹配条件的最后一个条目

Posted

技术标签:

【中文标题】每个窗口匹配条件的最后一个条目【英文标题】:Last Entry that matches a condition per Window 【发布时间】:2018-03-05 17:50:19 【问题描述】:

此虚拟数据表示具有测量周期的设备。 一个测量周期从“类型”初始化到初始化。

我想知道的是 f.e.每个测量周期内的最后一个错误(条件会变得更加复杂)。

我已经找到了解决方案。我真正想知道的是是否有更简单/更有效的方法来计算。

示例数据集

val df_orig = spark.sparkContext.parallelize(Seq(
      ("Init", 1, 17, "I"),
      ("TypeA", 2, 17, "W"),
      ("TypeA", 3, 17, "E"),
      ("TypeA", 4, 17, "W"),
      ("TypeA", 5, 17, "E"),
      ("TypeA", 6, 17, "W"),
      ("Init", 7, 12, "I"),
      ("TypeB", 8, 12, "W"),
      ("TypeB", 9, 12, "E"),
      ("TypeB", 10, 12, "W"),
      ("TypeB", 11, 12, "W"),
      ("TypeB", 12, 12, "E"),
      ("TypeB", 13, 12, "E")
    )).toDF("Type", "rn", "X_ChannelC", "Error_Type")

以下代码代表我的解决方案。

val fillWindow = Window.partitionBy().orderBy($"rn").rowsBetween(Window.unboundedPreceding, 0)

    //create window
    val df_with_window = df_orig.withColumn("window_flag", when($"Type".contains("Init"), 1).otherwise(null))
        .withColumn("window_filled", sum($"window_flag").over(fillWindow))

    val window = Window.partitionBy("window_filled").orderBy($"rn").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

    //calulate last entry
    val df_new = df_with_window.withColumn("is_relevant", when($"Error_Type".contains("E"), $"rn").otherwise(null))
      .withColumn("last", last($"is_relevant", true).over(window))
      .withColumn("pass", when($"last" === $"is_relevant", "Fail").otherwise(null))

    df_new.show()

结果:

+-----+---+----------+----------+-----------+-------------+-----------+----+--------+
| Type| rn|X_ChannelC|Error_Type|window_flag|window_filled|is_relevant|last|    pass|
+-----+---+----------+----------+-----------+-------------+-----------+----+--------+
| Init|  1|        17|         I|          1|            1|       null|   5|    null|
|TypeA|  2|        17|         W|       null|            1|       null|   5|    null|
|TypeA|  3|        17|         E|       null|            1|          3|   5|    null|
|TypeA|  4|        17|         W|       null|            1|       null|   5|    null|
|TypeA|  5|        17|         E|       null|            1|          5|   5|This one|
|TypeA|  6|        17|         W|       null|            1|       null|   5|    null|
| Init|  7|        12|         I|          1|            2|       null|  13|    null|
|TypeB|  8|        12|         W|       null|            2|       null|  13|    null|
|TypeB|  9|        12|         E|       null|            2|          9|  13|    null|
|TypeB| 10|        12|         W|       null|            2|       null|  13|    null|
|TypeB| 11|        12|         W|       null|            2|       null|  13|    null|
|TypeB| 12|        12|         E|       null|            2|         12|  13|    null|
|TypeB| 13|        12|         E|       null|            2|         13|  13|This one|
+-----+---+----------+----------+-----------+-------------+-----------+----+--------+

【问题讨论】:

我的回答对你有帮助吗,如果有请采纳 它并没有真正帮助我,因为它是相同的答案,只是代码更短。物理计划的工作方式相同。两种解决方案都使用了两个窗口函数 【参考方案1】:

不确定这是否更有效(仍然使用 2 个窗口函数,但要短一些):

val df_new = df_orig
 .withColumn("measurement", sum(when($"Type"==="Init",1)).over(Window.orderBy($"rn")))
 .withColumn("pass", $"rn"===max(when($"Error_Type"==="E",$"rn")).over(Window.partitionBy($"measurement")))
 .show()

+-----+---+----------+----------+-----------+-----+
| Type| rn|X_ChannelC|Error_Type|measurement| pass|
+-----+---+----------+----------+-----------+-----+
| Init|  1|        17|         I|          1|false|
|TypeA|  2|        17|         W|          1|false|
|TypeA|  3|        17|         E|          1|false|
|TypeA|  4|        17|         W|          1|false|
|TypeA|  5|        17|         E|          1| true|
|TypeA|  6|        17|         W|          1|false|
| Init|  7|        12|         I|          2|false|
|TypeB|  8|        12|         W|          2|false|
|TypeB|  9|        12|         E|          2|false|
|TypeB| 10|        12|         W|          2|false|
|TypeB| 11|        12|         W|          2|false|
|TypeB| 12|        12|         E|          2|false|
|TypeB| 13|        12|         E|          2| true|
+-----+---+----------+----------+-----------+-----+

【讨论】:

以上是关于每个窗口匹配条件的最后一个条目的主要内容,如果未能解决你的问题,请参考以下文章

有条件的 PySpark 窗口

Horspool 字符串匹配算法

从列表条目访问 SQL 数据

SQL Server SUM IF 使用具有多个条件的窗口函数

在 PySpark 中的窗口上获取与某些条件匹配的第一行

如何改进 Python 中列表的模式匹配