分区上的火花scala数据框功能

Posted

技术标签:

【中文标题】分区上的火花scala数据框功能【英文标题】:spark scala dataframe function over partition 【发布时间】:2017-06-24 09:08:20 【问题描述】:

我有 2 亿行,其中 1K 组看起来像这样

Group     X             Y             Z          Q           W
group1  0.054464866 0.002248819 0.299069804 0.763352879 0.395905106
group2  0.9986218   0.023649037 0.50762069  0.212225807 0.619571705
group1  0.839928517 0.290339179 0.050407454 0.75837838  0.495466007
group1  0.021003132 0.663366686 0.687928832 0.239132224 0.020848608
group1  0.393843426 0.006299292 0.141103438 0.858481036 0.715860852
group2  0.045960198 0.014858905 0.672267793 0.59750871  0.893646818

我想为每个组运行相同的功能(比如X 上的linear regression[X, Z, Q, W])。我本可以完成Window.partition 等,但我有自己的功能。目前,我执行以下操作:

df.select("Group").distinct.collect.toList.foreachgroup => 
val dfGroup = df.filter(col("Group")===group
dfGroup.withColumn("res", myUdf(col("X"), col("Y"), col("Z"), col("Q"), col("W"))

想知道是否有更好的方法吗?

【问题讨论】:

一个 UDF 作用于单行,所以你为什么不在原始数据帧上使用你的 UDF,只选择一个组是没有意义的。 如果你想做线性回归(涉及一个组的所有记录),我会做类似df.repartition($"Group").mapPartitionsrows => rows.toSeq.groupBy(row => row.getAs[String]("Group")).mapValues(...) 【参考方案1】:

根据您的喜好,您至少有两个选项:DataFrame 或 Dataset。

带有 UDAF 的数据帧

df
  .groupBy("group")
  .agg(myUdaf(col("col1"), col("col2")))

myUdaf 是 UDAF

这里您可以找到如何实现 UDAF 的示例:https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html

数据集

您可以使用来自 Dataset API 的 groupByKeymapGroups 转换:

ds
  .groupByKey(_.group)
  .mapGroupscase (group, values) =>
    (group, aggregator(values))
  

aggregator 是负责聚合对象集合的 Scala 函数。

如果您不需要聚合,您可以使用map 转换映射values,例如:

values.map(v => fun(...))

【讨论】:

以上是关于分区上的火花scala数据框功能的主要内容,如果未能解决你的问题,请参考以下文章

使用 scala 使用布尔运算折叠火花数据框中的列

基于非空值加入火花数据框(scala)

根据scala中的条件对列进行火花数据框聚合

根据列值对火花数据框进行分区?

如何根据列在火花中重新分区?

火花数据框计算列