pyspark 数据帧上的复杂逻辑,包括前一行现有值以及动态生成的前一行值

Posted

技术标签:

【中文标题】pyspark 数据帧上的复杂逻辑,包括前一行现有值以及动态生成的前一行值【英文标题】:complex logic on pyspark dataframe including previous row existing value as well as previous row value generated on the fly 【发布时间】:2020-07-15 15:46:14 【问题描述】:

我必须在 spark 数据帧或 rdd(最好是数据帧)上应用一个逻辑,这需要生成两个额外的列。第一个生成的列依赖于同一行的其他列,第二个生成的列依赖于前一行的第一个生成的列。

以下是表格格式的问题陈述。 A 和 B 列在数据框中可用。将生成 C 和 D 列。

A |  B   | C            |     D
------------------------------------
1 | 100  |  default val |    C1-B1
2 | 200  |  D1-C1       |    C2-B2
3 | 300  |  D2-C2       |    C3-B3
4 | 400  |  D3-C3       |    C4-B4
5 | 500  |  D4-C4       |    C5-B5

这是样本数据

A |  B   |    C   |   D
------------------------
1 | 100  |   1000 |  900
2 | 200  |  -100  | -300
3 | 300  |  -200  | -500
4 | 400  |  -300  | -700
5 | 500  |  -400  | -900

我能想到的唯一解决方案是将输入数据帧合并为 1,将其转换为 rdd,然后将 python 函数(具有所有计算逻辑)应用于 mapPartitions API。 但是,这种方法可能会在一个执行器上产生负载。

【问题讨论】:

【参考方案1】:

数学上看,D1-C1,其中 D1= C1-B1;所以 D1-C1 将变为 C1-B1-C1 => -B1。 在 pyspark 中,窗口函数有一个称为 default 的参数。这应该可以简化您的问题。试试这个:

import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame([(1,100),(2,200),(3,300),(4,400),(5,500)],['a','b'])
w=Window.orderBy('a')
df_lag =df.withColumn('c',F.lag((F.col('b')*-1),default=1000).over(w))
df_final = df_lag.withColumn('d',F.col('c')-F.col('b'))

结果:

df_final.show()
+---+---+----+----+
|  a|  b|   c|   d|
+---+---+----+----+
|  1|100|1000| 900|
|  2|200|-100|-300|
|  3|300|-200|-500|
|  4|400|-300|-700|
|  5|500|-400|-900|
+---+---+----+----+

如果运算是除减法以外的复杂运算,则适用相同的逻辑 - 用默认值填充 C 列 - 计算 D ,然后使用 lag 计算 C 并重新计算 D。

【讨论】:

问题陈述中显示的计算是为了表示目的。实际计算不是简单的减法。我想要一个可以实现上表中 D 列的依赖的解决方案。 @PrateekPathak - 您也可以对任何计算应用相同的逻辑。用默认值填充 C 列 - 计算 d ,然后使用 lag 计算 C 并重新计算 D 是的,它奏效了。我赞成的您的评论是解决方案的要点。我已采用 Thales Souto 提供的代码并根据评论对其进行修改。 @PrateekPathak - 很高兴听到。我看不到对 cme​​ts 的支持。如果答案有帮助,您是否可以投票并接受答案,以便将来对其他人有用?(无义务)我还将更新答案以包含评论。【参考方案2】:

lag() 函数可以帮助您:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

w =  Window.orderBy("A")

df1 = df1.withColumn("C", F.lit(1000))

df2 = (
      df1
     .withColumn("D", F.col("C") - F.col("B"))
     .withColumn("C", 
                  F.when(F.lag("C").over(w).isNotNull(), 
                         F.lag("D").over(w) - F.lag("C").over(w))
                   .otherwise(F.col("C")))
     .withColumn("D", F.col("C") - F.col("B"))
)

【讨论】:

本方案中,在计算D时,所有行的C值都固定为1000。但是在问题陈述中,第 2 行的 C 列取决于第 1 行的 D 列。解决方案的倒数第三行也缺少一个 over(w)。 感谢您对丢失代码的提醒。解释为什么我将所有行都固定为 1000,它是在第一行中设置的 default_value,然后用后续值再次计算列 C。无论如何,如果您想使用列的先前值来计算某些东西,那么 lag() 函数可能会有所帮助。 我已经执行了这段代码。此代码生成的列 D 的数据是 900,800,700,600,500 但是列 D 的所需输出是 900,-300,-500,-700,-900,这是因为上面的代码在执行列 D 时考虑了所有行的默认值 C计算。从顶部开始,第 4 行还有一个小的代码更改。您需要再次将输出数据帧分配给 df1。 根据@Raghu从另一个答案的评论后,我们需要在计算C列后再次计算D列。这样我们就会得到想要的答案。

以上是关于pyspark 数据帧上的复杂逻辑,包括前一行现有值以及动态生成的前一行值的主要内容,如果未能解决你的问题,请参考以下文章

使用pyspark查找每个对应列的两个数据帧上的值差异

在 pandas 数据帧上应用 Pyspark 管道

如何使用 pySpark 使多个 json 处理更快?

在 pyspark 中的数据帧上应用 udf 后出错

如何在 pyspark 中的数据帧上使用 fuzz.ratio

在 pyspark 数据帧上减少和 Lambda