如何在窗口 scala/spark 中使用 partitionBy 函数
Posted
技术标签:
【中文标题】如何在窗口 scala/spark 中使用 partitionBy 函数【英文标题】:How to use partitionBy function in window scala/spark 【发布时间】:2017-07-24 06:05:45 【问题描述】:我有一个DataFrame
有两列,index
和 values
。我想根据values
列获取delayValues
。
这是我的代码:
val arr = Array(1,4,3,2,5,7,3,5,4,18)
val input=new ArrayBuffer[(Int,Int)]()
for(i<-0 until 10)
input.append((i,arr(i)))
val window=Window.rowsBetween(-2,0)
val df = sc.parallelize(input, 4).toDF("index","values")
df.withColumn("valueDealy",first(col("values")).over(window)).show()
这是结果:
这是我的预期结果,但我发现所有数据都收集到一个分区中,然后我使用partitionBy
函数,这是我更改的代码:
val arr = Array(1,4,3,2,5,7,3,5,4,18)
val input=new ArrayBuffer[(Int,Int)]()
for(i<-0 until 10)
input.append((i,arr(i)))
val window=Window.orderBy(col("index")).partitionBy(col("index")).rowsBetween(-2,0)
val df = sc.parallelize(input, 4).toDF("index","values")
df.withColumn("valueDealy",first(col("values")).over(window)).show()
结果是:
+-----+------+----------+
|index|values|valueDealy|
+-----+------+----------+
| 0| 1| 1|
| 3| 2| 2|
| 7| 5| 5|
| 9| 18| 18|
| 4| 5| 5|
| 6| 3| 3|
| 5| 7| 7|
| 2| 3| 3|
| 1| 4| 4|
| 8| 4| 4|
+-----+------+----------+
我使用partitionBy
时得到错误的结果,我该怎么办?谢谢!
我的预期输出是:
+-----+------+----------+
|index|values|valueDealy|
+-----+------+----------+
| 0| 1| 1|
| 1| 4| 1|
| 2| 3| 1|
| 3| 2| 4|
| 4| 5| 3|
| 5| 7| 2|
| 6| 3| 5|
| 7| 5| 7|
| 8| 4| 3|
| 9| 18| 5|
+-----+------+----------+
并且数据在多个分区中!
【问题讨论】:
您的预期输出是什么? 我已经改变了我的问题@RameshMaharjan 【参考方案1】:一般来说,没有有效的解决方案可以单独用 Spark SQL 直接表达。就个人而言,在使用 Scala 时,我会使用 mllib
函数:
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val n = 2
spark.createDataFrame(
df.rdd.sliding(n + 1).map xs => Row(xs(0), xs(n)) ,
StructType(Seq(
StructField("delay", df.schema), StructField("current", df.schema))))
但是如果你的数据集包含连续的id,你也可以join
:
df.alias("current").join(
df.withColumn("index", $"index" - n).alias("previous"), Seq("index"))
请注意,这两种解决方案都可能需要在系列的开头/结尾进行一些更正。使用join
,您可以使用outer
连接,使用sliding
,您可以使用union
在数据集的开头/结尾添加所需数量的记录。
【讨论】:
以上是关于如何在窗口 scala/spark 中使用 partitionBy 函数的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Scala Spark 项目中使用 PySpark UDF?
如何在IntelliJ IDEA中运行Java/Scala/Spark程序
scala - Spark:如何在 groupedData 中获取带有条件的结果集
如何在 Scala/Spark 的数据框中扩展数组 [重复]