如何根据spark scala中的条件进行累积和
Posted
技术标签:
【中文标题】如何根据spark scala中的条件进行累积和【英文标题】:How to do cumulative sum based on conditions in spark scala 【发布时间】:2022-01-23 02:45:39 【问题描述】:我有以下数据,final_column
是我想要得到的确切输出。我正在尝试做 flag
的累积总和,如果 flag
为 0 则想休息,然后将值设置为 0 如下数据
cola date flag final_column
a 2021-10-01 0 0
a 2021-10-02 1 1
a 2021-10-03 1 2
a 2021-10-04 0 0
a 2021-10-05 0 0
a 2021-10-06 0 0
a 2021-10-07 1 1
a 2021-10-08 1 2
a 2021-10-09 1 3
a 2021-10-10 0 0
b 2021-10-01 0 0
b 2021-10-02 1 1
b 2021-10-03 1 2
b 2021-10-04 0 0
b 2021-10-05 0 0
b 2021-10-06 1 1
b 2021-10-07 1 2
b 2021-10-08 1 3
b 2021-10-09 1 4
b 2021-10-10 0 0
我试过了
import org.apache.spark.sql.functions._
df.withColumn("final_column",expr("sum(flag) over(partition by cola order date asc)"))
我尝试在 sum 函数中添加 case when flag = 0 then 0 else 1 end
之类的条件,但不起作用。
【问题讨论】:
【参考方案1】:您可以在flag
上使用条件求和来定义列group
,然后使用row_number
和由cola
和group
分区的窗口给出您想要的结果:
import org.apache.spark.sql.expressions.Window
val result = df.withColumn(
"group",
sum(when(col("flag") === 0, 1).otherwise(0)).over(Window.partitionBy("cola").orderBy("date"))
).withColumn(
"final_column",
row_number().over(Window.partitionBy("cola", "group").orderBy("date")) - 1
).drop("group")
result.show
//+----+-----+----+------------+
//|cola| date|flag|final_column|
//+----+-----+----+------------+
//| b|44201| 0| 0|
//| b|44202| 1| 1|
//| b|44203| 1| 2|
//| b|44204| 0| 0|
//| b|44205| 0| 0|
//| b|44206| 1| 1|
//| b|44207| 1| 2|
//| b|44208| 1| 3|
//| b|44209| 1| 4|
//| b|44210| 0| 0|
//| a|44201| 0| 0|
//| a|44202| 1| 1|
//| a|44203| 1| 2|
//| a|44204| 0| 0|
//| a|44205| 0| 0|
//| a|44206| 0| 0|
//| a|44207| 1| 1|
//| a|44208| 1| 2|
//| a|44209| 1| 3|
//| a|44210| 0| 0|
//+----+-----+----+------------+
row_number() - 1
在这种情况下只相当于sum(col("flag"))
,因为标志值总是0或1。所以上面的final_column
也可以写成:
.withColumn(
"final_column",
sum(col("flag")).over(Window.partitionBy("cola", "group").orderBy("date"))
)
【讨论】:
以上是关于如何根据spark scala中的条件进行累积和的主要内容,如果未能解决你的问题,请参考以下文章
在 Spark Scala 中的列上运行累积/迭代 Costum 方法
如何在 if-else 条件下的列中使用 Spark 值 - Scala
如何使用 spark(scala)读取和写入(更新)同一个文件
如何根据 Spark Scala 中其他数据帧中的多列匹配过滤数据帧