如何根据spark scala中的条件进行累积和

Posted

技术标签:

【中文标题】如何根据spark scala中的条件进行累积和【英文标题】:How to do cumulative sum based on conditions in spark scala 【发布时间】:2022-01-23 02:45:39 【问题描述】:

我有以下数据,final_column 是我想要得到的确切输出。我正在尝试做 flag 的累积总和,如果 flag 为 0 则想休息,然后将值设置为 0 如下数据

cola date       flag final_column
a   2021-10-01  0   0
a   2021-10-02  1   1
a   2021-10-03  1   2
a   2021-10-04  0   0
a   2021-10-05  0   0
a   2021-10-06  0   0
a   2021-10-07  1   1
a   2021-10-08  1   2
a   2021-10-09  1   3
a   2021-10-10  0   0
b   2021-10-01  0   0
b   2021-10-02  1   1
b   2021-10-03  1   2
b   2021-10-04  0   0
b   2021-10-05  0   0
b   2021-10-06  1   1
b   2021-10-07  1   2
b   2021-10-08  1   3
b   2021-10-09  1   4
b   2021-10-10  0   0

我试过了

import org.apache.spark.sql.functions._

df.withColumn("final_column",expr("sum(flag) over(partition by cola order date asc)"))

我尝试在 sum 函数中添加 case when flag = 0 then 0 else 1 end 之类的条件,但不起作用。

【问题讨论】:

【参考方案1】:

您可以在flag 上使用条件求和来定义列group,然后使用row_number 和由colagroup 分区的窗口给出您想要的结果:

import org.apache.spark.sql.expressions.Window

val result = df.withColumn(
    "group",
    sum(when(col("flag") === 0, 1).otherwise(0)).over(Window.partitionBy("cola").orderBy("date"))
).withColumn(
    "final_column",
    row_number().over(Window.partitionBy("cola", "group").orderBy("date")) - 1
).drop("group")

result.show

//+----+-----+----+------------+
//|cola| date|flag|final_column|
//+----+-----+----+------------+
//|   b|44201|   0|           0|
//|   b|44202|   1|           1|
//|   b|44203|   1|           2|
//|   b|44204|   0|           0|
//|   b|44205|   0|           0|
//|   b|44206|   1|           1|
//|   b|44207|   1|           2|
//|   b|44208|   1|           3|
//|   b|44209|   1|           4|
//|   b|44210|   0|           0|
//|   a|44201|   0|           0|
//|   a|44202|   1|           1|
//|   a|44203|   1|           2|
//|   a|44204|   0|           0|
//|   a|44205|   0|           0|
//|   a|44206|   0|           0|
//|   a|44207|   1|           1|
//|   a|44208|   1|           2|
//|   a|44209|   1|           3|
//|   a|44210|   0|           0|
//+----+-----+----+------------+

row_number() - 1 在这种情况下只相当于sum(col("flag")),因为标志值总是0或1。所以上面的final_column也可以写成:

.withColumn(
    "final_column",
    sum(col("flag")).over(Window.partitionBy("cola", "group").orderBy("date"))
)

【讨论】:

以上是关于如何根据spark scala中的条件进行累积和的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark Scala 中的列上运行累积/迭代 Costum 方法

如何在 if-else 条件下的列中使用 Spark 值 - Scala

如何使用 spark(scala)读取和写入(更新)同一个文件

如何根据 Spark Scala 中其他数据帧中的多列匹配过滤数据帧

NotNull 条件不适用于 spark 数据框 scala 中的 withColumn 条件

根据来自其他数据帧的位置条件在数据帧上编写选择查询,scala