Spark Scala - 基于时间段的聚合和透视

Posted

技术标签:

【中文标题】Spark Scala - 基于时间段的聚合和透视【英文标题】:Spark Scala - Aggregation and Pivoting Based on Time Period 【发布时间】:2017-09-27 21:05:47 【问题描述】:

我试图在 spark 中实现类似于 sql server 的旋转

到目前为止,我正在使用 sqlContext 并在 sql 中应用所有转换。 我想知道我是否可以直接从 sql server 中提取并使用 spark 实现枢轴功能。

以下是我正在尝试实现的示例- 下面的 SQL Server 查询-

create table #temp(ID Int, MonthPrior int, Amount float);

insert into #temp values (100,1,10),(100,2,20),(100,3,30),(100,4,10),(100,5,20),(100,6,60),(200,1,10),(200,2,20),(200,3,30),(300,4,10),(300,5,20),(300,6,60);

select * from #temp;

|ID |上月|金额| |-------|----------|------| |100 |1 |10| |100 |2 |20| |100 |3 |30| |100 |4 |10| |100 |5 |20| |100 |6 |60| |200 |1 |10| |200 |2 |20| |200 |3 |30| |300 |4 |10| |300 |5 |20| |300 |6 |60|

Select ID,coalesce([1],0) as Amount1Mth, coalesce([1],0)+coalesce([2],0)+coalesce([3],0) as Amount1to3Mth, coalesce([1],0)+coalesce([2],0)+coalesce([3],0)+coalesce([4],0)+coalesce([5],0)+coalesce([6],0) as Amount_AllMonths from (select * from #temp) A pivot ( sum(Amount) for MonthPrior in ([1],[2],[3],[4],[5],[6]) ) as Pvt

|ID |Amount1Mth |Amount1to3Mth |Amount_AllMonths| |-------|-------|-------|---| |100 |10 |60 |150| |200 |10 |60 |60| |300 |0 |0 |90|

【问题讨论】:

【参考方案1】:

如果您的Amount 列是Decimal 类型,最好使用java.math.BigDecimal 作为相应的参数类型。请注意,方法 +sum 不再适用,因此分别替换为 addreduce

import org.apache.spark.sql.functions._
import java.math.BigDecimal

val df = Seq(
  (100, 1, new BigDecimal(10)),
  (100, 2, new BigDecimal(20)),
  (100, 3, new BigDecimal(30)),
  (100, 4, new BigDecimal(10)),
  (100, 5, new BigDecimal(20)),
  (100, 6, new BigDecimal(60)),
  (200, 1, new BigDecimal(10)),
  (200, 2, new BigDecimal(20)),
  (200, 3, new BigDecimal(30)),
  (300, 4, new BigDecimal(10)),
  (300, 5, new BigDecimal(20)),
  (300, 6, new BigDecimal(60))
).toDF("ID", "MonthPrior", "Amount")

// UDF to combine 2 array-type columns to map
def arrayToMap = udf(
  (a: Seq[Int], b: Seq[BigDecimal]) => (a zip b).toMap
)

// Create array columns which get zipped into a map
val df2 = df.groupBy("ID").agg(
  collect_list(col("MonthPrior")).as("MonthList"),
  collect_list(col("Amount")).as("AmountList")
).select(
  col("ID"), arrayToMap(col("MonthList"), col("AmountList")).as("MthAmtMap")
)

// UDF to sum map values for keys from 1 thru n (0 for all)
def sumMapValues = udf(
  (m: Map[Int, BigDecimal], n: Int) =>
    if (n > 0)
      m.collect case (k, v) => if (k <= n) v else new BigDecimal(0) .reduce(_ add _)
    else
      m.collect case (k, v) => v .reduce(_ add _)
)

val df3 = df2.withColumn( "Amount1Mth", sumMapValues(col("MthAmtMap"), lit(1)) ).
  withColumn( "Amount1to3Mth", sumMapValues(col("MthAmtMap"), lit(3)) ).
  withColumn( "Amount_AllMonths", sumMapValues(col("MthAmtMap"), lit(0)) ).
  select( col("ID"), col("Amount1Mth"), col("Amount1to3Mth"), col("Amount_AllMonths") )

df3.show(truncate=false)
+---+--------------------+--------------------+--------------------+
| ID|          Amount1Mth|       Amount1to3Mth|    Amount_AllMonths|
+---+--------------------+--------------------+--------------------+
|300|               0E-18|               0E-18|90.00000000000000...|
|100|10.00000000000000...|60.00000000000000...|150.0000000000000...|
|200|10.00000000000000...|60.00000000000000...|60.00000000000000...|
+---+--------------------+--------------------+--------------------+

【讨论】:

【参考方案2】:

一种方法是从MonthPriorAmount 的数组中创建一个映射类型的列,并应用一个UDF,根据一个整数参数对映射值求和:

val df = Seq(
  (100, 1, 10),
  (100, 2, 20),
  (100, 3, 30),
  (100, 4, 10),
  (100, 5, 20),
  (100, 6, 60),
  (200, 1, 10),
  (200, 2, 20),
  (200, 3, 30),
  (300, 4, 10),
  (300, 5, 20),
  (300, 6, 60)
).toDF("ID", "MonthPrior", "Amount")

import org.apache.spark.sql.functions._

// UDF to combine 2 array-type columns to map
def arrayToMap = udf(
  (a: Seq[Int], b: Seq[Int]) => (a zip b).toMap
)

// Aggregate columns into arrays and apply arrayToMap UDF to create map column
val df2 = df.groupBy("ID").agg(
  collect_list(col("MonthPrior")).as("MonthList"),
  collect_list(col("Amount")).as("AmountList")
).select(
  col("ID"), arrayToMap(col("MonthList"), col("AmountList")).as("MthAmtMap")
)

// UDF to sum map values for keys from 1 thru n (0 for all)
def sumMapValues = udf(
  (m: Map[Int, Int], n: Int) =>
    if (n > 0) m.collect case (k, v) => if (k <= n) v else 0 .sum else
      m.collect case (k, v) => v .sum
)

// Apply sumMapValues UDF to the map column
val df3 = df2.withColumn( "Amount1Mth", sumMapValues(col("MthAmtMap"), lit(1)) ).
  withColumn( "Amount1to3Mth", sumMapValues(col("MthAmtMap"), lit(3)) ).
  withColumn( "Amount_AllMonths", sumMapValues(col("MthAmtMap"), lit(0)) ).
  select( col("ID"), col("Amount1Mth"), col("Amount1to3Mth"), col("Amount_AllMonths") )

df3.show
+---+----------+-------------+----------------+
| ID|Amount1Mth|Amount1to3Mth|Amount_AllMonths|
+---+----------+-------------+----------------+
|300|         0|            0|              90|
|100|        10|           60|             150|
|200|        10|           60|              60|
+---+----------+-------------+----------------+

【讨论】:

谢谢@LeoC 我会分析这个方法。似乎工作 (a: Seq[Int], b: Seq[Int]) => (a zip b).toMap 出现问题。我正在处理这个cannot resolve 'UDF(col_1,col_2)' due to data type mismatch: argument 2 requires array&lt;int&gt; type, however, 'col_2' is of array&lt;decimal(38,18)&gt; type. 尝试在 udf 中使用数字/十进制。还是不行 听起来您的Amount 列的类型是Decimal。修改现有UDFs 的参数类型以适应所述类型应该是微不足道的。不过,某些算术方法可能需要调整。 @ashok viswanathan,为了避免冗长的更新,我已经发布了 Decimal 类型的解决方案作为单独的答案。【参考方案3】:

感谢@LeoC。上述解决方案有效。我也尝试了以下-

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column


lazy val months = (((df select ($"MonthPrior") distinct) sort 
($"MonthPrior".asc)).rdd map (_.getAs[Int](0)) collect).toList

lazy val sliceSpec = List((0, 2, "1-2"), (0, 3, "1-3"), (0, 4, "1-4"), (0, 5, "1-5"), (0, 6, "1-6"))

lazy val createGroup: List[Any] => ((Int, Int, String) => Column) = sliceMe => (start, finish, aliasName) =>
  sliceMe slice (start, finish) map (value => col(value.toString)) reduce (_ + _) as aliasName

lazy val grouper = createGroup(months).tupled

lazy val groupedCols = sliceSpec map (group => grouper(group))

val pivoted = df groupBy ($"ID") pivot ("MonthPrior") agg (sum($"Amount"))

val writeMe = pivoted select ((pivoted.columns map col) ++ (groupedCols): _*)

z.show(writeMe sort ($"ID".asc))

【讨论】:

以上是关于Spark Scala - 基于时间段的聚合和透视的主要内容,如果未能解决你的问题,请参考以下文章

解释 Spark 中的聚合功能(使用 Python 和 Scala)

spark-sql/Scala 中的反透视列名是数字

使用聚合 spark 和 scala 选择

Spark:数据帧聚合(Scala)

没有聚合的 Spark 数据帧枢轴

Spark:在scala中的数据帧上使用动态过滤器进行聚合