如何对数组列的元素进行切片和求和?
Posted
技术标签:
【中文标题】如何对数组列的元素进行切片和求和?【英文标题】:How to slice and sum elements of array column? 【发布时间】:2016-10-20 09:52:41 【问题描述】:我想使用 SparkSQL 在数组列上 sum
(或执行其他聚合函数)。
我有一张桌子
+-------+-------+---------------------------------+
|dept_id|dept_nm| emp_details|
+-------+-------+---------------------------------+
| 10|Finance| [100, 200, 300, 400, 500]|
| 20| IT| [10, 20, 50, 100]|
+-------+-------+---------------------------------+
我想总结这个emp_details
列的值。
预期查询:
sqlContext.sql("select sum(emp_details) from mytable").show
预期结果
1500
180
我也应该能够对范围元素求和:
sqlContext.sql("select sum(slice(emp_details,0,3)) from mytable").show
结果
600
80
当按预期对 Array 类型进行求和时,它表明 sum 期望参数是数字类型而不是数组类型。
我认为我们需要为此创建 UDF。但如何?
UDF 是否会影响性能? 除了UDF之外还有其他解决方案吗?
【问题讨论】:
【参考方案1】:火花 2.4.0
截至Spark 2.4,Spark SQL 支持高阶函数,用于操作复杂的数据结构,包括数组。
“现代”解决方案如下:
scala> input.show(false)
+-------+-------+-------------------------+
|dept_id|dept_nm|emp_details |
+-------+-------+-------------------------+
|10 |Finance|[100, 200, 300, 400, 500]|
|20 |IT |[10, 20, 50, 100] |
+-------+-------+-------------------------+
input.createOrReplaceTempView("mytable")
val sqlText = "select dept_id, dept_nm, aggregate(emp_details, 0, (acc, value) -> acc + value) as sum from mytable"
scala> sql(sqlText).show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
| 10|Finance|1500|
| 20| IT| 180|
+-------+-------+----+
您可以在以下文章和视频中找到有关高阶函数的好读物:
-
Introducing New Built-in and Higher-Order Functions for Complex Data Types in Apache Spark 2.4
Working with Nested Data Using Higher Order Functions in SQL on Databricks
An Introduction to Higher Order Functions in Spark SQL with Herman van Hovell (Databricks)
Spark 2.3.2 及更早版本
免责声明我不会推荐这种方法(即使它得到了最多的支持),因为 Spark SQL 为执行 Dataset.map
而进行了反序列化。该查询强制 Spark 反序列化数据并将其加载到 JVM(从由 Spark 在 JVM 外部管理的内存区域)。这将不可避免地导致更频繁的 GC,从而使性能变差。
一种解决方案是使用Dataset
解决方案,Spark SQL 和 Scala 的组合可以显示其强大功能。
scala> val inventory = Seq(
| (10, "Finance", Seq(100, 200, 300, 400, 500)),
| (20, "IT", Seq(10, 20, 50, 100))).toDF("dept_id", "dept_nm", "emp_details")
inventory: org.apache.spark.sql.DataFrame = [dept_id: int, dept_nm: string ... 1 more field]
// I'm too lazy today for a case class
scala> inventory.as[(Long, String, Seq[Int])].
map case (deptId, deptName, details) => (deptId, deptName, details.sum) .
toDF("dept_id", "dept_nm", "sum").
show
+-------+-------+----+
|dept_id|dept_nm| sum|
+-------+-------+----+
| 10|Finance|1500|
| 20| IT| 180|
+-------+-------+----+
我将切片部分留作练习,因为它同样简单。
【讨论】:
这是否在后台使用 UDF? @JonDeaton “这个”指的是什么?如果我知道答案,我可以提供帮助。 行map case (deptId, deptName, details) => (deptId, deptName, details.sum)
不。这是 Spark SQL 字面上使用的 Scala 函数,即没有任何优化(如果有的话)。【参考方案2】:
从 Spark 2.4 开始,您可以使用 slice
函数进行切片:
import org.apache.spark.sql.functions.slice
val df = Seq(
(10, "Finance", Seq(100, 200, 300, 400, 500)),
(20, "IT", Seq(10, 20, 50, 100))
).toDF("dept_id", "dept_nm", "emp_details")
val dfSliced = df.withColumn(
"emp_details_sliced",
slice($"emp_details", 1, 3)
)
dfSliced.show(false)
+-------+-------+-------------------------+------------------+
|dept_id|dept_nm|emp_details |emp_details_sliced|
+-------+-------+-------------------------+------------------+
|10 |Finance|[100, 200, 300, 400, 500]|[100, 200, 300] |
|20 |IT |[10, 20, 50, 100] |[10, 20, 50] |
+-------+-------+-------------------------+------------------+
并使用aggregate
对数组求和:
dfSliced.selectExpr(
"*",
"aggregate(emp_details, 0, (x, y) -> x + y) as details_sum",
"aggregate(emp_details_sliced, 0, (x, y) -> x + y) as details_sliced_sum"
).show
+-------+-------+--------------------+------------------+-----------+------------------+
|dept_id|dept_nm| emp_details|emp_details_sliced|details_sum|details_sliced_sum|
+-------+-------+--------------------+------------------+-----------+------------------+
| 10|Finance|[100, 200, 300, 4...| [100, 200, 300]| 1500| 600|
| 20| IT| [10, 20, 50, 100]| [10, 20, 50]| 180| 80|
+-------+-------+--------------------+------------------+-----------+------------------+
【讨论】:
噢!直到现在我才注意到你的回答。我保持我的编辑不变,然后投票赞成你的答案。我们是平的,不是吗? @JacekLaskowski 不用担心,尽管为了公平起见,我希望得到对我们最近的pivot
讨论的回应 :)【参考方案3】:
一种可能的方法是在您的Array
列上使用explode()
,从而通过唯一键聚合输出。例如:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
(mytable
.withColumn("emp_sum",
explode($"emp_details"))
.groupBy("dept_nm")
.agg(sum("emp_sum")).show)
+-------+------------+
|dept_nm|sum(emp_sum)|
+-------+------------+
|Finance| 1500|
| IT| 180|
+-------+------------+
要仅选择数组中的特定值,我们可以使用链接问题的答案并稍作修改即可应用它:
val slice = udf((array : Seq[Int], from : Int, to : Int) => array.slice(from,to))
(mytable
.withColumn("slice",
slice($"emp_details",
lit(0),
lit(3)))
.withColumn("emp_sum",
explode($"slice"))
.groupBy("dept_nm")
.agg(sum("emp_sum")).show)
+-------+------------+
|dept_nm|sum(emp_sum)|
+-------+------------+
|Finance| 600|
| IT| 80|
+-------+------------+
数据:
val data = Seq((10, "Finance", Array(100,200,300,400,500)),
(20, "IT", Array(10,20,50,100)))
val mytable = sc.parallelize(data).toDF("dept_id", "dept_nm","emp_details")
【讨论】:
这是一个很好的方法,我忘记了它在过去使用它。【参考方案4】:这是mtoto's answer 的替代方案,不使用groupBy
(我真的不知道哪个最快:UDF、mtoto 解决方案或我的,欢迎 cmets)
一般来说,使用UDF
会对性能产生影响。有一个 answer 您可能想阅读,而这个 resource 是 UDF 上的好读物。
现在,对于您的问题,您可以避免使用 UDF。我会使用 Scala 逻辑生成的 Column
表达式。
数据:
val df = Seq((10, "Finance", Array(100,200,300,400,500)),
(20, "IT", Array(10, 20, 50,100)))
.toDF("dept_id", "dept_nm","emp_details")
您需要一些技巧才能遍历ArrayType
,您可以使用解决方案来发现各种问题(请参阅底部的slice
部分的编辑)。这是我的建议,但您可能会发现更好。首先你取最大长度
val maxLength = df.select(size('emp_details).as("l")).groupBy().max("l").first.getInt(0)
然后你使用它,当你有一个较短的数组时进行测试
val sumArray = (1 until maxLength)
.map(i => when(size('emp_details) > i,'emp_details(i)).otherwise(lit(0)))
.reduce(_ + _)
.as("sumArray")
val res = df
.select('dept_id,'dept_nm,'emp_details,sumArray)
结果:
+-------+-------+--------------------+--------+
|dept_id|dept_nm| emp_details|sumArray|
+-------+-------+--------------------+--------+
| 10|Finance|[100, 200, 300, 4...| 1500|
| 20| IT| [10, 20, 50, 100]| 180|
+-------+-------+--------------------+--------+
我建议您查看sumArray
以了解它在做什么。
编辑:当然,我只再次阅读了一半的问题...但是如果您想更改要求和的项目,您可以看到这个解决方案变得很明显(即您不需要切片函数),只需将 (0 until maxLength)
更改为您需要的索引范围即可:
def sumArray(from: Int, max: Int) = (from until max)
.map(i => when(size('emp_details) > i,'emp_details(i)).otherwise(lit(0)))
.reduce(_ + _)
.as("sumArray")
【讨论】:
【参考方案5】:rdd方式没了,我补充一下吧。
val df = Seq((10, "Finance", Array(100,200,300,400,500)),(20, "IT", Array(10,20,50,100))).toDF("dept_id", "dept_nm","emp_details")
import scala.collection.mutable._
val rdd1 = df.rdd.map( x=> val p = x.getAs[mutable.WrappedArray[Int]]("emp_details").toArray; Row.merge(x,Row(p.sum,p.slice(0,2).sum)) )
spark.createDataFrame(rdd1,df.schema.add(StructField("sumArray",IntegerType)).add(StructField("sliceArray",IntegerType))).show(false)
输出:
+-------+-------+-------------------------+--------+----------+
|dept_id|dept_nm|emp_details |sumArray|sliceArray|
+-------+-------+-------------------------+--------+----------+
|10 |Finance|[100, 200, 300, 400, 500]|1500 |300 |
|20 |IT |[10, 20, 50, 100] |180 |30 |
+-------+-------+-------------------------+--------+----------+
【讨论】:
为什么要使用 RDD API,因为它使用数据集? 对于这种情况,rdd 方式似乎很简单。一旦将 sum() 和 slice() 函数转换为数组,就可以访问它。 rdd 也是第一个抽象,它将始终兼容。 我认为@JacekLaskowski 在这里提出的观点是为什么要使用昂贵的 RDD 转换,如果 Dataset 和 RDD 功能完全重叠,并且您可以使用几乎相同的代码(模式与编码器是肤浅的)。 @stack0114106 至少转换为 RDD 会创建一个分析障碍(不是实际的优化器障碍,但结果是等效的),这本身就是不好的。此外,它需要大量分配来初始化Row
对象。最后初始化需要低效的Encoder[Row]
。所有这些都很昂贵。有时贵得令人望而却步。使用 functional / strongly typed(由于缺乏更好的描述)Dataset API 会遇到from some,但并非所有这些问题。和 SQL / DataFrame,从无到有。
有遗留 RDD 或 DStream API 的用例(@JacekLaskowski 在这里可能不同意我的观点:)),但简单的 map
根本不是其中之一。附带说明 - 应该用于ArrayType
的接口是Seq
。【参考方案6】:
以 zero323 的出色答案为基础;如果您有一个长整数数组,即 BIGINT,您需要将初始值从 0 更改为 BIGINT(0),如第一段中所述here 所以你有
dfSliced.selectExpr(
"*",
"aggregate(emp_details, BIGINT(0), (x, y) -> x + y) as details_sum",
"aggregate(emp_details_sliced, BIGINT(0), (x, y) -> x + y) as details_sliced_sum"
).show
【讨论】:
以上是关于如何对数组列的元素进行切片和求和?的主要内容,如果未能解决你的问题,请参考以下文章