Pyspark 在查找前一行时按组迭代数据帧

Posted

技术标签:

【中文标题】Pyspark 在查找前一行时按组迭代数据帧【英文标题】:Pyspark iterate over dataframe by group on lookup of previous row 【发布时间】:2017-09-20 04:54:46 【问题描述】:

请帮助我,我是新手。下面是我的数据框

type col1 col2 col3
1    0    41   0
1    27   0    0
1    1    0    0 
1    183  0    2
2    null 0    0
2    null 10   0
3    0    126  0
3    2    0    1
3    4    0    0
3    5    0    0

下面应该是我的输出

type col1 col2 col3 result
1    0    41   0    0
1    27   0    0    14
1    1    0    0    13
1    183  0    2    -168
2    null 0    0
2    null 10   0
3    0    126  0    0
3    2    0    1    125
3    4    0    0    121
3    5    0    0    116

挑战在于必须对每组类型列执行此操作,公式类似于 prev(col2)-col1+col3

我尝试在 col2 上使用 window 和 lag 函数来填充结果列,但它不起作用。

下面是我的代码

part = Window().partitionBy().orderBy('type')
DF = DF.withColumn('result',lag("col2").over(w)-DF.col1+DF.col3)

现在我正在努力尝试使用地图功能,请帮助

【问题讨论】:

您的逻辑 prev(col2)-col1+col3 与预期输出不匹配。 是的 Ramesh 同意它必须是 prev(result)-col1+col3 。但是我们如何取 col2 41 的值并用 col1 27 减去呢? .所以在考虑这些方面并认为 lag(col2) 会动态修复,但我悲惨地失败了。试图多想,但我没有得到任何线索 @user3292373 prev(result)-col1+col3 也不匹配。第二行使用这个变成 -27,而不是 14。 请告诉我们如何实现这一点我应该得到输出为 41-27+0=14 的下一行它必须采用 14-1+0=13 和 13-183+2= -168 这对于每组类型 1,2 和 3 都必须重复。 13-183+2 = 172 而不是 168。是真的吗? 【参考方案1】:

逻辑有点棘手和复杂。

您可以在pyspark中执行以下操作

pyspark

from pyspark.sql import functions as F
from pyspark.sql import Window
import sys
windowSpec = Window.partitionBy("type").orderBy("type")
df = df.withColumn('result', F.lag(df.col2, 1).over(windowSpec) - df.col1 + df.col3)
df = df.withColumn('result', F.when(df.result.isNull(), F.lit(0)).otherwise(df.result))
df = df.withColumn('result', F.sum(df.result).over(windowSpec.rowsBetween(-sys.maxsize, -1)) + df.result)
df = df.withColumn('result', F.when(df.result.isNull(), F.lit(0)).otherwise(df.result))

斯卡拉

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("type").orderBy("type")
df.withColumn("result", lag("col2", 1).over(windowSpec) - $"col1"+$"col3")
  .withColumn("result", when($"result".isNull, lit(0)).otherwise($"result"))
  .withColumn("result", sum("result").over(windowSpec.rowsBetween(Long.MinValue, -1)) +$"result")
  .withColumn("result", when($"result".isNull, lit(0)).otherwise($"result"))

你应该有以下结果。

+----+----+----+----+------+
|type|col1|col2|col3|result|
+----+----+----+----+------+
|1   |0   |41  |0   |0.0   |
|1   |27  |0   |0   |14.0  |
|1   |1   |0   |0   |13.0  |
|1   |183 |0   |2   |-168.0|
|3   |0   |126 |0   |0.0   |
|3   |2   |0   |1   |125.0 |
|3   |4   |0   |0   |121.0 |
|3   |5   |0   |0   |116.0 |
|2   |null|0   |0   |0.0   |
|2   |null|10  |0   |0.0   |
+----+----+----+----+------+

已编辑

第一个withColumn 应用公式prev(col2) - col1 + col3。第二个withColumnresult 列的null 更改为0。第三个withColumn 用于累积和,即将所有值相加,直到结果列的当前行。所以三个withColumn 等价于prev(col2) + prev(results) 1 col1 + col3。最后一个withColumn 正在将result 列中的null 值更改为0

【讨论】:

非常感谢您的努力 Ramesh。我试图了解 Long.MinValue 做了什么,以便我可以在 pysaprk 中复制 我也编辑了 pyspark 的工作代码 :) 请看一下 :) 非常感谢兄弟,我尝试了 max.size 的各种组合,你 ping 的那个对我有用。再次感谢。你能解释一下 sys.maxsize 和最后三行代码吗?我很想知道 我已经解释过了。 :) 感谢您的支持和接受 :) 亲爱的 Ramesh,我不明白每次运行逻辑时会发生什么,我现在的库存值不同。没有对数据 Ramesh 进行任何更改。真的不知道该怎么办。我看到 rowsBetween(-sys.maxsize, -1)) 不理解这个 Ramesh 的问题。请帮忙

以上是关于Pyspark 在查找前一行时按组迭代数据帧的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 数据帧上的复杂逻辑,包括前一行现有值以及动态生成的前一行值

PySpark:我们应该迭代更新数据帧吗?

按组查找金额

Pyspark 数据帧,在标志之间迭代,基于组

如何在巨大数据帧的每一行中查找前 n 个值的列索引

如何在 PySpark 中为一个组迭代 Dataframe / RDD 的每一行。?