Spark Scala - 基于时间段的聚合和透视
Posted
技术标签:
【中文标题】Spark Scala - 基于时间段的聚合和透视【英文标题】:Spark Scala - Aggregation and Pivoting Based on Time Period 【发布时间】:2017-09-27 21:05:47 【问题描述】:我试图在 spark 中实现类似于 sql server 的旋转
到目前为止,我正在使用 sqlContext 并在 sql 中应用所有转换。 我想知道我是否可以直接从 sql server 中提取并使用 spark 实现枢轴功能。
以下是我正在尝试实现的示例- 下面的 SQL Server 查询-
create table #temp(ID Int, MonthPrior int, Amount float);
insert into #temp
values (100,1,10),(100,2,20),(100,3,30),(100,4,10),(100,5,20),(100,6,60),(200,1,10),(200,2,20),(200,3,30),(300,4,10),(300,5,20),(300,6,60);
select * from #temp;
|ID |上月|金额| |-------|----------|------| |100 |1 |10| |100 |2 |20| |100 |3 |30| |100 |4 |10| |100 |5 |20| |100 |6 |60| |200 |1 |10| |200 |2 |20| |200 |3 |30| |300 |4 |10| |300 |5 |20| |300 |6 |60|
Select ID,coalesce([1],0) as Amount1Mth, coalesce([1],0)+coalesce([2],0)+coalesce([3],0) as Amount1to3Mth, coalesce([1],0)+coalesce([2],0)+coalesce([3],0)+coalesce([4],0)+coalesce([5],0)+coalesce([6],0) as Amount_AllMonths from (select * from #temp) A
pivot
( sum(Amount) for MonthPrior in ([1],[2],[3],[4],[5],[6]) ) as Pvt
|ID |Amount1Mth |Amount1to3Mth |Amount_AllMonths| |-------|-------|-------|---| |100 |10 |60 |150| |200 |10 |60 |60| |300 |0 |0 |90|
【问题讨论】:
【参考方案1】:如果您的Amount
列是Decimal
类型,最好使用java.math.BigDecimal
作为相应的参数类型。请注意,方法 +
和 sum
不再适用,因此分别替换为 add
和 reduce
。
import org.apache.spark.sql.functions._
import java.math.BigDecimal
val df = Seq(
(100, 1, new BigDecimal(10)),
(100, 2, new BigDecimal(20)),
(100, 3, new BigDecimal(30)),
(100, 4, new BigDecimal(10)),
(100, 5, new BigDecimal(20)),
(100, 6, new BigDecimal(60)),
(200, 1, new BigDecimal(10)),
(200, 2, new BigDecimal(20)),
(200, 3, new BigDecimal(30)),
(300, 4, new BigDecimal(10)),
(300, 5, new BigDecimal(20)),
(300, 6, new BigDecimal(60))
).toDF("ID", "MonthPrior", "Amount")
// UDF to combine 2 array-type columns to map
def arrayToMap = udf(
(a: Seq[Int], b: Seq[BigDecimal]) => (a zip b).toMap
)
// Create array columns which get zipped into a map
val df2 = df.groupBy("ID").agg(
collect_list(col("MonthPrior")).as("MonthList"),
collect_list(col("Amount")).as("AmountList")
).select(
col("ID"), arrayToMap(col("MonthList"), col("AmountList")).as("MthAmtMap")
)
// UDF to sum map values for keys from 1 thru n (0 for all)
def sumMapValues = udf(
(m: Map[Int, BigDecimal], n: Int) =>
if (n > 0)
m.collect case (k, v) => if (k <= n) v else new BigDecimal(0) .reduce(_ add _)
else
m.collect case (k, v) => v .reduce(_ add _)
)
val df3 = df2.withColumn( "Amount1Mth", sumMapValues(col("MthAmtMap"), lit(1)) ).
withColumn( "Amount1to3Mth", sumMapValues(col("MthAmtMap"), lit(3)) ).
withColumn( "Amount_AllMonths", sumMapValues(col("MthAmtMap"), lit(0)) ).
select( col("ID"), col("Amount1Mth"), col("Amount1to3Mth"), col("Amount_AllMonths") )
df3.show(truncate=false)
+---+--------------------+--------------------+--------------------+
| ID| Amount1Mth| Amount1to3Mth| Amount_AllMonths|
+---+--------------------+--------------------+--------------------+
|300| 0E-18| 0E-18|90.00000000000000...|
|100|10.00000000000000...|60.00000000000000...|150.0000000000000...|
|200|10.00000000000000...|60.00000000000000...|60.00000000000000...|
+---+--------------------+--------------------+--------------------+
【讨论】:
【参考方案2】:一种方法是从MonthPrior
和Amount
的数组中创建一个映射类型的列,并应用一个UDF
,根据一个整数参数对映射值求和:
val df = Seq(
(100, 1, 10),
(100, 2, 20),
(100, 3, 30),
(100, 4, 10),
(100, 5, 20),
(100, 6, 60),
(200, 1, 10),
(200, 2, 20),
(200, 3, 30),
(300, 4, 10),
(300, 5, 20),
(300, 6, 60)
).toDF("ID", "MonthPrior", "Amount")
import org.apache.spark.sql.functions._
// UDF to combine 2 array-type columns to map
def arrayToMap = udf(
(a: Seq[Int], b: Seq[Int]) => (a zip b).toMap
)
// Aggregate columns into arrays and apply arrayToMap UDF to create map column
val df2 = df.groupBy("ID").agg(
collect_list(col("MonthPrior")).as("MonthList"),
collect_list(col("Amount")).as("AmountList")
).select(
col("ID"), arrayToMap(col("MonthList"), col("AmountList")).as("MthAmtMap")
)
// UDF to sum map values for keys from 1 thru n (0 for all)
def sumMapValues = udf(
(m: Map[Int, Int], n: Int) =>
if (n > 0) m.collect case (k, v) => if (k <= n) v else 0 .sum else
m.collect case (k, v) => v .sum
)
// Apply sumMapValues UDF to the map column
val df3 = df2.withColumn( "Amount1Mth", sumMapValues(col("MthAmtMap"), lit(1)) ).
withColumn( "Amount1to3Mth", sumMapValues(col("MthAmtMap"), lit(3)) ).
withColumn( "Amount_AllMonths", sumMapValues(col("MthAmtMap"), lit(0)) ).
select( col("ID"), col("Amount1Mth"), col("Amount1to3Mth"), col("Amount_AllMonths") )
df3.show
+---+----------+-------------+----------------+
| ID|Amount1Mth|Amount1to3Mth|Amount_AllMonths|
+---+----------+-------------+----------------+
|300| 0| 0| 90|
|100| 10| 60| 150|
|200| 10| 60| 60|
+---+----------+-------------+----------------+
【讨论】:
谢谢@LeoC 我会分析这个方法。似乎工作 (a: Seq[Int], b: Seq[Int]) => (a zip b).toMap 出现问题。我正在处理这个cannot resolve 'UDF(col_1,col_2)' due to data type mismatch: argument 2 requires array<int> type, however, 'col_2' is of array<decimal(38,18)> type.
尝试在 udf 中使用数字/十进制。还是不行
听起来您的Amount
列的类型是Decimal
。修改现有UDF
s 的参数类型以适应所述类型应该是微不足道的。不过,某些算术方法可能需要调整。
@ashok viswanathan,为了避免冗长的更新,我已经发布了 Decimal
类型的解决方案作为单独的答案。【参考方案3】:
感谢@LeoC。上述解决方案有效。我也尝试了以下-
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
lazy val months = (((df select ($"MonthPrior") distinct) sort
($"MonthPrior".asc)).rdd map (_.getAs[Int](0)) collect).toList
lazy val sliceSpec = List((0, 2, "1-2"), (0, 3, "1-3"), (0, 4, "1-4"), (0, 5, "1-5"), (0, 6, "1-6"))
lazy val createGroup: List[Any] => ((Int, Int, String) => Column) = sliceMe => (start, finish, aliasName) =>
sliceMe slice (start, finish) map (value => col(value.toString)) reduce (_ + _) as aliasName
lazy val grouper = createGroup(months).tupled
lazy val groupedCols = sliceSpec map (group => grouper(group))
val pivoted = df groupBy ($"ID") pivot ("MonthPrior") agg (sum($"Amount"))
val writeMe = pivoted select ((pivoted.columns map col) ++ (groupedCols): _*)
z.show(writeMe sort ($"ID".asc))
【讨论】:
以上是关于Spark Scala - 基于时间段的聚合和透视的主要内容,如果未能解决你的问题,请参考以下文章