在 Spark 中为 rowsBetween 和 rangeBetween 指定默认值

Posted

技术标签:

【中文标题】在 Spark 中为 rowsBetween 和 rangeBetween 指定默认值【英文标题】:Specify default value for rowsBetween and rangeBetween in Spark 【发布时间】:2018-02-15 10:52:10 【问题描述】:

我有一个关于 Sparks Dataframe 1.6 中的窗口操作的问题。

假设我有下表:

id|MONTH  |number
1  201703  2
1  201704  3
1  201705  7
1  201706  6

目前我正在使用 rowsBetween 函数:

val window = Window.partitionBy("id")
  .orderBy(asc("MONTH"))
  .rowsBetween(-2, 0)

randomDF.withColumn("counter", sum(col("number")).over(window))

这给了我以下结果:

id|MONTH  |number |counter
1  201703  2       2
1  201704  3       5
1  201705  7       12
1  201706  6       16

我不想实现的是在没有前置行时设置默认值(如在 lag() 和 Lead() 中)。例如:'0' 这样我得到的结果如下:

id|MONTH  |number |counter
1  201703  2       0
1  201704  3       0
1  201705  7       12
1  201706  6       16

我已经查看了文档,但 Spark 1.6 不允许这样做,我想知道是否有某种解决方法。

非常感谢!

【问题讨论】:

您对额外的withColumn 操作的解决方法感兴趣吗? 如果可能的话,我正在考虑对当前行进行窗口化,并使用collect_list() 将它们放入列表中。然后在下一个 withColumn 操作中检查该列表是否包含 3 个值,否则将值设置为 0。但窗口操作中的 collect_list() 在版本 1.6 中不起作用 【参考方案1】:

这样的事情怎么样:

添加额外的lag 步骤 用case 替换值

代码

val rowsRdd: RDD[Row] = spark.sparkContext.parallelize(
  Seq(
    Row(1, 1, 201703, 2),
    Row(2, 1, 201704, 3),
    Row(3, 1, 201705, 7),
    Row(4, 1, 201706, 6)))

val schema: StructType = new StructType()
  .add(StructField("sortColumn",     IntegerType,  false))
  .add(StructField("id",     IntegerType,  false))
  .add(StructField("month",  IntegerType, false))
  .add(StructField("number",  IntegerType, false))

val df0: DataFrame = spark.createDataFrame(rowsRdd, schema)

val prevRows = 2

val window = Window.partitionBy("id")
  .orderBy(col("month"))
  .rowsBetween(-prevRows, 0)

val window2 = Window.partitionBy("id")
  .orderBy(col("month"))

val df2 = df0.withColumn("counter", sum(col("number")).over(window))
val df3 = df2.withColumn("myLagTmp", lag(lit(1), prevRows).over(window2))
val df4 = df3.withColumn("counter", expr("case when myLagTmp is null then 0 else counter end")).drop(col("myLagTmp"))
df4.sort("sortColumn").show()

【讨论】:

【参考方案2】:

感谢@astro_asz 的回答,我想出了以下解决方案:

val numberRowsBetween = 2
val window1 = Window.partitionBy("id").orderBy("MONTH")
val window2 = Window.partitionBy("id")
      .orderBy(asc("MONTH"))
      .rowsBetween(-(numberRowsBetween - 1), 0)

randomDF.withColumn("counter", when(lag(col("number"), numberRowsBetween , 0).over(window1) === 0, 0)
                .otherwise(sum(col("number")).over(window2)))

此解决方案会将“0”作为默认值。

【讨论】:

以上是关于在 Spark 中为 rowsBetween 和 rangeBetween 指定默认值的主要内容,如果未能解决你的问题,请参考以下文章

CDH 中为spark 安装 python3

在 Spark 数据框的列中为每个组添加递增的数字

如何在 Apache Spark 中为具有不同结构的两个 DataFrame 实现 NOT IN

我可以在 build.sbt 中为 spark 3 和 scala 2.12 获取 neo4j 的命令(依赖项)吗?

如何在 Spark on YARN 中为 Spark UI 创建安全过滤器

在 Apache Spark 中为每行迭代添加范围变量