使用Spark中的复杂条件和滞后自引用创建新列
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Spark中的复杂条件和滞后自引用创建新列相关的知识,希望对你有一定的参考价值。
我正在尝试在基于以下内容的Spark数据框中创建一个新列:
此列的先前值(即列中的新值基于其上的值,而该值又基于...)
非常复杂的条件语句(24个不同的条件),取决于其他列的值(以及变量本身的滞后值)
例如,类似于此循环中的逻辑:
for row, i in df:
if row.col1 == "a":
row.col4 = row.col1 + row.col3
row.col5 = 11
if row.col1 == "b":
if row.col3 == 1:
row.col4 = lag(row.col4) + row.col1 + row.col2
row.col5 = 14
if row.col3 == 0:
row.col4 = lag(row.col4) + row.col1 + row.col3)
row.col5 = 17
if row.col1 == "d":
if row.col3 == 1:
row.col4 = 99
row.col5 = 19
if lag(row.col4) == 99:
row.col4 = lag(row.col4) + row.col5
row.col5 = etc...
(...以及c
和d
的另外21个可能的值
示例
我想转换此:
w = Window.orderBy(col("col1").asc())
df = spark.createDataFrame([
("a", 2, 0),
("b", 3, 1),
("b", 4, 0),
("d", 5, 1),
("e", 6, 0),
("f", 7, 1)
], ["col1", "col2","col3"])
+----+----+----+
|col1|col2|col3|
+----+----+----+
| a| 2| 0|
| b| 3| 1|
| b| 4| 0|
| d| 5| 1|
| e| 6| 0|
| f| 7| 1|
+----+----+----+
...进入此:
+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+
|col1|col2|col3|col4 >(explanation) |col5 >(also uses complex logic) |
+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+
| a| 2| 0|a0 >(because (col1==a) ==> col1+col3) |11 > |
| b| 3| 1|a0b3 >(because (col1==b & col3==1) ==> lag(col4)+col1+col2)|14 > |
| b| 4| 0|a0b3b0 >(because (col1==b & col3==0) ==> lag(col4)+col1+col3)|17 > |
| d| 5| 1|99 >(because (col1==d) ==> 99) |19 > |
| e| 6| 0|9919 >(because (lag(col4)==99) ==> lag(col4)+col5 |e6 > |
| f| 7| 1|etc... >etc... |etc..>etc... |
+----+----+----+--------+-----------------------------------------------------+-----+---------------------------+
Spark完全有可能吗?我尝试过的所有方法都无效:
- 我还没有找到将UDF的输出反馈到下一个UDF计算中的方法
- 条件+自引用使得基本上不可能将以前的值存储在临时列中。
- 我尝试使用硕大的
when
子句,但在引用withColumn()
语句中列本身的滞后值时遇到了麻烦。when()
+lag()
方法的另一个问题是其他变量引用了滞后变量,而滞后变量引用了其他变量。 (换句话说,每行只有一个滞后值,但是该值根据该行所满足的条件与其他变量的交互作用不同。
答案
如果您对UDF满意,那就很简单(我只是在下面的代码中复制了您的条件)。对于非UDF解决方案,它取决于lag
列在if
条件下的显示方式,除非您可以提供更多或最复杂的if
条件的示例,否则我会说UDF是最简单的方法。
以上是关于使用Spark中的复杂条件和滞后自引用创建新列的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Scala Spark 中使用具有许多条件的“.withColumn”为数据集创建新列
使用 pyspark 基于 if 和 else 条件创建新列