在 Spark 中基于窗口和条件创建新列

Posted

技术标签:

【中文标题】在 Spark 中基于窗口和条件创建新列【英文标题】:Creating a new column based on a window and a condition in Spark 【发布时间】:2020-06-07 21:44:59 【问题描述】:

初始数据帧:

+------------------------------+----------+-------+
|          Timestamp           | Property | Value |
+------------------------------+----------+-------+
| 2019-09-01T01:36:57.000+0000 | X        |     N |
| 2019-09-01T01:37:39.000+0000 | A        |     3 |
| 2019-09-01T01:42:55.000+0000 | X        |     Y |
| 2019-09-01T01:53:44.000+0000 | A        |    17 |
| 2019-09-01T01:55:34.000+0000 | A        |     9 |
| 2019-09-01T01:57:32.000+0000 | X        |     N |
| 2019-09-01T02:59:40.000+0000 | A        |     2 |
| 2019-09-01T02:00:03.000+0000 | A        |    16 |
| 2019-09-01T02:01:40.000+0000 | X        |     Y |
| 2019-09-01T02:04:03.000+0000 | A        |    21 |
+------------------------------+----------+-------+

最终数据帧:

+------------------------------+----------+-------+---+
|          Timestamp           | Property | Value | X |
+------------------------------+----------+-------+---+
| 2019-09-01T01:37:39.000+0000 | A        |     3 | N |
| 2019-09-01T01:53:44.000+0000 | A        |    17 | Y |
| 2019-09-01T01:55:34.000+0000 | A        |     9 | Y |
| 2019-09-01T02:00:03.000+0000 | A        |    16 | N |
| 2019-09-01T02:04:03.000+0000 | A        |    21 | Y |
| 2019-09-01T02:59:40.000+0000 | A        |     2 | Y |
+------------------------------+----------+-------+---+

基本上,我有一个时间戳、一个属性和一个值字段。该属性可以是AX,并且它有一个值。我想根据X 属性的值创建一个新的DataFrame,其中第四列名为X

    我开始浏览从最早到最旧的行。 我遇到了具有 X 属性的行,我存储了它的值并将其插入到 X 列中。 如果我遇到 A 属性行:我将上一步中存储的值插入 X 列。 ELSE(意味着我遇到了 X 属性行):我更新了存储的值(因为它是更新的)并将新的存储值插入到 X 列中。 我一直这样做,直到完成整个数据框。 我删除了带有 X 属性的行以使最终数据框显示在上面。

我确信有某种方法可以通过 Window 函数有效地做到这一点。

【问题讨论】:

给定 TS 2:59:40,2 的 X 值在预期输出中应该是 Y。对吗? 是的,感谢您的编辑。 【参考方案1】:

用值 X 的值创建一个临时列,如果 A 则为空。然后使用窗口获取最后一个非空临时值。最后过滤属性“A”。

scala> val df = Seq(
     |   ("2019-09-01T01:36:57.000+0000", "X", "N"),
     |   ("2019-09-01T01:37:39.000+0000", "A", "3"),
     |   ("2019-09-01T01:42:55.000+0000", "X", "Y"),
     |   ("2019-09-01T01:53:44.000+0000", "A", "17"),
     |   ("2019-09-01T01:55:34.000+0000", "A", "9"),
     |   ("2019-09-01T01:57:32.000+0000", "X", "N"),
     |   ("2019-09-01T02:59:40.000+0000", "A", "2"),
     |   ("2019-09-01T02:00:03.000+0000", "A", "16"),
     |   ("2019-09-01T02:01:40.000+0000", "X", "Y"),
     |   ("2019-09-01T02:04:03.000+0000", "A", "21")
     | ).toDF("Timestamp", "Property", "Value").withColumn("Temp", when($"Property" === "X", $"Value").otherwise(null))
df: org.apache.spark.sql.DataFrame = [Timestamp: string, Property: string ... 2 more fields]

scala> df.show(false)
+----------------------------+--------+-----+----+
|Timestamp                   |Property|Value|Temp|
+----------------------------+--------+-----+----+
|2019-09-01T01:36:57.000+0000|X       |N    |N   |
|2019-09-01T01:37:39.000+0000|A       |3    |null|
|2019-09-01T01:42:55.000+0000|X       |Y    |Y   |
|2019-09-01T01:53:44.000+0000|A       |17   |null|
|2019-09-01T01:55:34.000+0000|A       |9    |null|
|2019-09-01T01:57:32.000+0000|X       |N    |N   |
|2019-09-01T02:59:40.000+0000|A       |2    |null|
|2019-09-01T02:00:03.000+0000|A       |16   |null|
|2019-09-01T02:01:40.000+0000|X       |Y    |Y   |
|2019-09-01T02:04:03.000+0000|A       |21   |null|
+----------------------------+--------+-----+----+


scala> val overColumns = Window.orderBy("TimeStamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)

overColumns: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@1b759662

scala> df.withColumn("X", last($"Temp",true).over(overColumns)).show(false)
+----------------------------+--------+-----+----+---+
|Timestamp                   |Property|Value|Temp|X  |
+----------------------------+--------+-----+----+---+
|2019-09-01T01:36:57.000+0000|X       |N    |N   |N  |
|2019-09-01T01:37:39.000+0000|A       |3    |null|N  |
|2019-09-01T01:42:55.000+0000|X       |Y    |Y   |Y  |
|2019-09-01T01:53:44.000+0000|A       |17   |null|Y  |
|2019-09-01T01:55:34.000+0000|A       |9    |null|Y  |
|2019-09-01T01:57:32.000+0000|X       |N    |N   |N  |
|2019-09-01T02:00:03.000+0000|A       |16   |null|N  |
|2019-09-01T02:01:40.000+0000|X       |Y    |Y   |Y  |
|2019-09-01T02:04:03.000+0000|A       |21   |null|Y  |
|2019-09-01T02:59:40.000+0000|A       |2    |null|Y  |
+----------------------------+--------+-----+----+---+

scala> df.withColumn("X", last($"Temp",true).over(overColumns)).filter($"Property" === "A").show(false)

+----------------------------+--------+-----+----+---+
|Timestamp                   |Property|Value|Temp|X  |
+----------------------------+--------+-----+----+---+
|2019-09-01T01:37:39.000+0000|A       |3    |null|N  |
|2019-09-01T01:53:44.000+0000|A       |17   |null|Y  |
|2019-09-01T01:55:34.000+0000|A       |9    |null|Y  |
|2019-09-01T02:00:03.000+0000|A       |16   |null|N  |
|2019-09-01T02:04:03.000+0000|A       |21   |null|Y  |
|2019-09-01T02:59:40.000+0000|A       |2    |null|Y  |
+----------------------------+--------+-----+----+---+


【讨论】:

谢谢,我就是这么问的。如果在初始数据帧中我还有一个名为“device_ID”的分类字段,我想要一个最终数据帧,就像您设法生成的数据帧一样,但是关于每个设备。换句话说,对device_ID 的每个值分别执行相同的操作。这意味着时间戳顺序仅对具有相同device_ID 的行很重要。当然,特定的device_id 同时具有 A 和 X 属性。 Window.partitionBy("device_id").orderBy("TimeStamp").rowsBet.... 你像这样按“device_id”分区 哇,这么简单/直接!我不知道这个partitionBy 函数。您是否认为应该使用其他子问题编辑问题并要求也编辑您的答案?我认为它可能对其他用户有用..

以上是关于在 Spark 中基于窗口和条件创建新列的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 基于 if 和 else 条件创建新列

基于固定日期间隔大小的移动窗口在R中改变新列

如何使用 Scala/Spark 添加不基于数据框中现有列的新列? [复制]

如何在 Scala Spark 中使用具有许多条件的“.withColumn”为数据集创建新列

Python - 基于其他列条件的新列[重复]

Spark SQL Windows:基于数组列创建数据框