使用Spark中的复杂条件和滞后自引用创建新列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Spark中的复杂条件和滞后自引用创建新列相关的知识,希望对你有一定的参考价值。

我正在尝试在基于以下内容的Spark数据框中创建一个新列:

  1. 此列的先前值(即列中的新值基于其上的值,而该值又基于...)

  2. 非常复杂的条件语句(24个不同的条件),取决于其他列的值(以及变量本身的滞后值)

例如,类似于此循环中的逻辑:

for row, i in df:
    if row.col1 == "a":
        row.col4 = row.col1 + row.col3
        row.col5 = 11
    if row.col1 == "b":
        if row.col3 == 1:
            row.col4 = lag(row.col4) + row.col1 + row.col2
            row.col5 = 14
        if row.col3 == 0:
            row.col4 = lag(row.col4) + row.col1 + row.col3)
            row.col5 = 17
    if row.col1 == "d":
        if row.col3 == 1:
            row.col4 = 99
            row.col5 = 19
    if lag(row.col4) == 99:
        row.col4 = lag(row.col4) + row.col5
        row.col5 = etc...

(...以及cd的另外21个可能的值

示例

我想转换此:

w = Window.orderBy(col("col1").asc())

df = spark.createDataFrame([
    ("a", 2, 0),
    ("b", 3, 1),
    ("b", 4, 0),
    ("d", 5, 1),
    ("e", 6, 0),
    ("f", 7, 1)
], ["col1", "col2","col3"])

+----+----+----+
|col1|col2|col3|
+----+----+----+
|   a|   2|   0|
|   b|   3|   1|
|   b|   4|   0|
|   d|   5|   1|
|   e|   6|   0|
|   f|   7|   1|
+----+----+----+

...进入此:

+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+
|col1|col2|col3|col4    >(explanation)                                        |col5 >(also uses complex logic)  |
+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+
|   a|   2|   0|a0      >(because (col1==a) ==> col1+col3)                    |11   >                           |
|   b|   3|   1|a0b3    >(because (col1==b & col3==1) ==> lag(col4)+col1+col2)|14   >                           |
|   b|   4|   0|a0b3b0  >(because (col1==b & col3==0) ==> lag(col4)+col1+col3)|17   >                           |
|   d|   5|   1|99      >(because (col1==d) ==> 99)                           |19   >                           |
|   e|   6|   0|9919    >(because (lag(col4)==99) ==> lag(col4)+col5          |e6   >                           |
|   f|   7|   1|etc...  >etc...                                               |etc..>etc...                     |
+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+

Spark完全有可能吗?我尝试过的所有方法都无效:

  • 我还没有找到将U​​DF的输出反馈到下一个UDF计算中的方法
  • 条件+自引用使得基本上不可能将以前的值存储在临时列中。
  • 我尝试使用硕大的when子句,但在引用withColumn()语句中列本身的滞后值时遇到了麻烦。when()+ lag()方法的另一个问题是其他变量引用了滞后变量,而滞后变量引用了其他变量。 (换句话说,每行只有一个滞后值,但是该值根据该行所满足的条件与其他变量的交互作用不同。
答案

如果您对UDF满意,那就很简单(我只是在下面的代码中复制了您的条件)。对于非UDF解决方案,它取决于lag列在if条件下的显示方式,除非您可以提供更多或最复杂的if条件的示例,否则我会说UDF是最简单的方法。

以上是关于使用Spark中的复杂条件和滞后自引用创建新列的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Scala Spark 中使用具有许多条件的“.withColumn”为数据集创建新列

Python:创建新列,计算当前日期和滞后日期之间的天数

使用 pyspark 基于 if 和 else 条件创建新列

Spark 1.5.0 (PySpark) 案例当逻辑和滞后窗口函数

如何在条件下在pyspark上创建一个新列?

根据条件在 Spark SQL 或 MySQL 中生成新列