Spark 和 SparkSQL:如何模仿窗口功能?

Posted

技术标签:

【中文标题】Spark 和 SparkSQL:如何模仿窗口功能?【英文标题】:Spark and SparkSQL: How to imitate window function? 【发布时间】:2015-09-04 22:29:47 【问题描述】:

说明

给定一个数据框df

id |       date
---------------
 1 | 2015-09-01
 2 | 2015-09-01
 1 | 2015-09-03
 1 | 2015-09-04
 2 | 2015-09-04

我想创建一个正在运行的计数器或索引,

按相同的 id 和分组 按该组中的日期排序,

因此

id |       date |  counter
--------------------------
 1 | 2015-09-01 |        1
 1 | 2015-09-03 |        2
 1 | 2015-09-04 |        3
 2 | 2015-09-01 |        1
 2 | 2015-09-04 |        2

这是我可以通过窗口功能实现的,例如

val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )

很遗憾,Spark 1.4.1 不支持常规数据帧的窗口函数:

org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;

问题

如何不使用窗口函数在当前的 Spark 1.4.1 上实现上述计算? Spark 何时支持常规数据帧的窗口函数?

谢谢!

【问题讨论】:

您需要使用数据框和 SQL,还是可以使用 RDD?使用 groupBy 方法非常简单。 @KirkBroadhurst:RDD 也可以。你能用一些代码摘录勾勒出你的想法吗?从 SparkSQL 开始,我目前看不到如何做到这一点:你有想法吗? 【参考方案1】:

您也可以将HiveContext 用于本地DataFrames,除非您有充分的理由不这样做,否则这可能是个好主意。它是spark-shellpyspark shell 中的默认SQLContext(目前sparkR 似乎使用普通SQLContext),Spark SQL and DataFrame Guide 推荐它的解析器。

import org.apache.spark.SparkContext, SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber

object HiveContextTest 
  def main(args: Array[String]) 
    val conf = new SparkConf().setAppName("Hive Context")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._

    val df = sc.parallelize(
        ("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil
    ).toDF("k", "v")

    val w = Window.partitionBy($"k").orderBy($"v")
    df.select($"k", $"v", rowNumber.over(w).alias("rn")).show
  

【讨论】:

【参考方案2】:

您可以使用 RDD 来做到这一点。就我个人而言,我发现 RDD 的 API 更有意义 - 我并不总是希望我的数据像数据框一样“平坦”。

val df = sqlContext.sql("select 1, '2015-09-01'"
    ).unionAll(sqlContext.sql("select 2, '2015-09-01'")
    ).unionAll(sqlContext.sql("select 1, '2015-09-03'")
    ).unionAll(sqlContext.sql("select 1, '2015-09-04'")
    ).unionAll(sqlContext.sql("select 2, '2015-09-04'"))

// dataframe as an RDD (of Row objects)
df.rdd 
  // grouping by the first column of the row
  .groupBy(r => r(0)) 
  // map each group - an Iterable[Row] - to a list and sort by the second column
  .map(g => g._2.toList.sortBy(row => row(1).toString))     
  .collect()

上面给出的结果如下:

Array[List[org.apache.spark.sql.Row]] = 
Array(
  List([1,2015-09-01], [1,2015-09-03], [1,2015-09-04]), 
  List([2,2015-09-01], [2,2015-09-04]))

如果你也想要在“组”中的位置,你可以使用zipWithIndex

df.rdd.groupBy(r => r(0)).map(g => 
    g._2.toList.sortBy(row => row(1).toString).zipWithIndex).collect()

Array[List[(org.apache.spark.sql.Row, Int)]] = Array(
  List(([1,2015-09-01],0), ([1,2015-09-03],1), ([1,2015-09-04],2)),
  List(([2,2015-09-01],0), ([2,2015-09-04],1)))

可以使用 FlatMap 将其展平为 Row 对象的简单列表/数组,但如果您需要在“组”上执行任何操作,这不是一个好主意。

像这样使用 RDD 的缺点是从 DataFrame 转换为 RDD 再转换回来很繁琐。

【讨论】:

非常感谢!!!这就是我正在寻找的解决方案。嗯,我只是不够“勇敢”,无法执行常规的 Scala list 操作,一旦 groupBy 完成...... 当我的“g._2.toList.sortBy”列表有数百万个元素时会发生什么,我无法收集它们【参考方案3】:

如果您拥有 Spark 版本 (>=)1.5,我完全同意 DataFrame 的 Window 函数是可行的方法。但是如果你真的被旧版本卡住了(例如 1.4.1),这里有一个解决这个问题的 hacky 方法

val df = sc.parallelize((1, "2015-09-01") :: (2, "2015-09-01") :: (1, "2015-09-03") :: (1, "2015-09-04") :: (1, "2015-09-04") :: Nil)
           .toDF("id", "date")

val dfDuplicate = df.selecExpr("id as idDup", "date as dateDup")
val dfWithCounter = df.join(dfDuplicate,$"id"===$"idDup")
                      .where($"date"<=$"dateDup")
                      .groupBy($"id", $"date")
                      .agg($"id", $"date", count($"idDup").as("counter"))
                      .select($"id",$"date",$"counter")

现在如果你这样做dfWithCounter.show

你会得到:

+---+----------+-------+                                                        
| id|      date|counter|
+---+----------+-------+
|  1|2015-09-01|      1|
|  1|2015-09-04|      3|
|  1|2015-09-03|      2|
|  2|2015-09-01|      1|
|  2|2015-09-04|      2|
+---+----------+-------+

注意date 没有排序,但counter 是正确的。您还可以通过在where 语句中将&lt;= 更改为&gt;= 来更改counter 的顺序。

【讨论】:

以上是关于Spark 和 SparkSQL:如何模仿窗口功能?的主要内容,如果未能解决你的问题,请参考以下文章

sparksql系列 sparksql列操作窗口函数join

Spark SQL 中分组依据和窗口函数如何交互?

如何加快 Spark SQL 单元测试?

Spark SQL DSL 中的窗口(固定、滑动等)和水印支持

Spark SQL:与时间窗口聚合

Spark SQL:窗口函数滞后直到满足条件