Spark SQL Windows:基于数组列创建数据框

Posted

技术标签:

【中文标题】Spark SQL Windows:基于数组列创建数据框【英文标题】:SparkSQL Windows: Creating Frame Based On Array Column 【发布时间】:2020-02-09 22:11:07 【问题描述】:

我希望使用 SparkSQL 的窗口函数,但在框架规范上有一个自定义条件。

被操作的dataframe如下:

+--------------------+--------------------+--------------------+-----+
|              userid|           elementid|       prerequisites|score|
+--------------------+--------------------+--------------------+-----+
|a                   |1                   |[]                  |  1  |
|a                   |2                   |[]                  |  1  |
|a                   |3                   |[]                  |  1  |
|b                   |1                   |[]                  |  1  |
|a                   |4                   |[1, 2]              |  1  |
+--------------------+--------------------+--------------------+-----+

prerequisites 列中的每个元素都是另一行的 elementid 列中的一个值。

我想创建一个按userid 分区的窗口,然后抓取所有前面的行,其中elementid 包含在当前行的prerequisites 列中。

一旦我到达这个窗口,我想在score 列上执行sum

上述示例的期望输出:

+--------------------+--------------------+--------------------+-----+
|              userid|           elementid|       prerequisites|sum  |
+--------------------+--------------------+--------------------+-----+
|a                   |1                   |[]                  |  0  |
|a                   |2                   |[]                  |  0  |
|a                   |3                   |[]                  |  0  |
|b                   |1                   |[]                  |  0  |
|a                   |4                   |[1, 2]              |  2  |
+--------------------+--------------------+--------------------+-----+

请注意,因为用户 a 是唯一具有其前面元素的先决条件的用户,所以它是唯一具有 > 0 sum 的用户。

我看到的最接近的问题是this question,它利用了collect_list。

但是,这并不能构建一个窗口,而是收集一个潜在的 ID 列表。有人对如何构建上述窗口有任何想法吗?

【问题讨论】:

你能给出你的样本数据完整和预期的输出吗? 已添加!谢谢尼克 【参考方案1】:
scala> import org.apache.spark.sql.expressions.Window,UserDefinedFunction

scala> df.show()
+------+---------+-------------+-----+
|userid|elementid|prerequisites|score|
+------+---------+-------------+-----+
|     a|        1|           []|    1|
|     a|        2|           []|    1|
|     a|        3|           []|    1|
|     b|        1|           []|    1|
|     a|        4|       [1, 2]|    1|
+------+---------+-------------+-----+

scala> df.printSchema
root
 |-- userid: string (nullable = true)
 |-- elementid: string (nullable = true)
 |-- prerequisites: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- score: string (nullable = true)

scala> val W = Window.partitionBy("userid")

scala> val df1 = df.withColumn("elementidList", collect_set(col("elementid")).over(W))
                   .withColumn("elementidScoreMap", map_from_arrays(col("elementidList"), collect_list(col("score").cast("long")).over(W)))
                   .withColumn("common", array_intersect(col("prerequisites"), col("elementidList")))
                   .drop("elementidList", "score") 

scala> def getSumUDF:UserDefinedFunction = udf((Score:Map[String,Long], Id:String) => 
     | var out:Long =  0
     | Id.split(",").foreach x => out = Score(x.toString) + out
     | out)

scala> df1.withColumn("sum", when(size(col("common")) =!= 0  ,getSumUDF(col("elementidScoreMap"), concat_ws(",",col("prerequisites")))).otherwise(lit(0)))
          .drop("elementidScoreMap", "common")
          .show()
+------+---------+-------------+---+
|userid|elementid|prerequisites|sum|
+------+---------+-------------+---+
|     b|        1|           []|  0|
|     a|        1|           []|  0|
|     a|        2|           []|  0|
|     a|        3|           []|  0|
|     a|        4|       [1, 2]|  2|
+------+---------+-------------+---+

【讨论】:

以上是关于Spark SQL Windows:基于数组列创建数据框的主要内容,如果未能解决你的问题,请参考以下文章

Learning Spark [6] - Spark SQL高级函数

如何使用 Spark Dataset API (Java) 创建数组列

在 Scala 中使用来自另一个没有数组列的 DataFrame 的数组类型列创建 Spark DataFrame 的有效方法是啥?

Spark Sql 从 Hive orc 分区表中读取,给出数组越界异常

如果 where 子句已经修复,如何加快 spark sql 过滤器查询?

pyspark 从 spark 数据框列创建一个不同的列表并在 spark sql where 语句中使用