如何在窗口 scala/spark 中使用 partitionBy 函数

Posted

技术标签:

【中文标题】如何在窗口 scala/spark 中使用 partitionBy 函数【英文标题】:How to use partitionBy function in window scala/spark 【发布时间】:2017-07-24 06:05:45 【问题描述】:

我有一个DataFrame 有两列,indexvalues。我想根据values 列获取delayValues

这是我的代码:

 val arr = Array(1,4,3,2,5,7,3,5,4,18)
    val input=new ArrayBuffer[(Int,Int)]()
    for(i<-0 until 10)
      input.append((i,arr(i)))
    
    val window=Window.rowsBetween(-2,0)
    val df = sc.parallelize(input, 4).toDF("index","values")
    df.withColumn("valueDealy",first(col("values")).over(window)).show()

这是结果:

这是我的预期结果,但我发现所有数据都收集到一个分区中,然后我使用partitionBy函数,这是我更改的代码:

val arr = Array(1,4,3,2,5,7,3,5,4,18)
    val input=new ArrayBuffer[(Int,Int)]()
    for(i<-0 until 10)
      input.append((i,arr(i)))
    
    val window=Window.orderBy(col("index")).partitionBy(col("index")).rowsBetween(-2,0)
    val df = sc.parallelize(input, 4).toDF("index","values")
    df.withColumn("valueDealy",first(col("values")).over(window)).show()

结果是:

+-----+------+----------+
|index|values|valueDealy|
+-----+------+----------+
|    0|     1|         1|
|    3|     2|         2|
|    7|     5|         5|
|    9|    18|        18|
|    4|     5|         5|
|    6|     3|         3|
|    5|     7|         7|
|    2|     3|         3|
|    1|     4|         4|
|    8|     4|         4|
+-----+------+----------+

我使用partitionBy时得到错误的结果,我该怎么办?谢谢!

我的预期输出是:

        +-----+------+----------+
        |index|values|valueDealy|
        +-----+------+----------+
        |    0|     1|         1|
        |    1|     4|         1|
        |    2|     3|         1|
        |    3|     2|         4|
        |    4|     5|         3|
        |    5|     7|         2|
        |    6|     3|         5|
        |    7|     5|         7|
        |    8|     4|         3|
        |    9|    18|         5|
        +-----+------+----------+

并且数据在多个分区中!

【问题讨论】:

您的预期输出是什么? 我已经改变了我的问题@RameshMaharjan 【参考方案1】:

一般来说,没有有效的解决方案可以单独用 Spark SQL 直接表达。就个人而言,在使用 Scala 时,我会使用 mllib 函数:

import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val n = 2
spark.createDataFrame(
  df.rdd.sliding(n + 1).map  xs  => Row(xs(0), xs(n)) ,
  StructType(Seq(
    StructField("delay", df.schema), StructField("current", df.schema))))

但是如果你的数据集包含连续的id,你也可以join:

df.alias("current").join(
  df.withColumn("index", $"index" - n).alias("previous"), Seq("index"))

请注意,这两种解决方案都可能需要在系列的开头/结尾进行一些更正。使用join,您可以使用outer 连接,使用sliding,您可以使用union 在数据集的开头/结尾添加所需数量的记录。

【讨论】:

以上是关于如何在窗口 scala/spark 中使用 partitionBy 函数的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Scala Spark 项目中使用 PySpark UDF?

如何在IntelliJ IDEA中运行Java/Scala/Spark程序

scala - Spark:如何在 groupedData 中获取带有条件的结果集

如何在 Scala/Spark 的数据框中扩展数组 [重复]

如何使用 Scala/Spark 添加不基于数据框中现有列的新列? [复制]

如何在 Scala Spark 中使用具有许多条件的“.withColumn”为数据集创建新列