如何将一组 RelationalGroupedDataset 传递给函数?

Posted

技术标签:

【中文标题】如何将一组 RelationalGroupedDataset 传递给函数?【英文标题】:How to pass a group of RelationalGroupedDataset to a function? 【发布时间】:2017-07-20 00:43:14 【问题描述】:

我正在通过以下方式将 csv 作为数据框读取:

val df  = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("D:/ModelData.csv")

然后我按如下三列分组,返回一个 RelationalGroupedDataset

df.groupBy("col1", "col2","col3")

并且我希望通过以下函数发送每个分组的数据帧

 def ModelFunction(daf: DataFrame) = 

    //do some calculation

          

例如,如果我有 col1 具有 2 个唯一值 (0,1) 和 col2 具有 2 个唯一值 (1,2) 和 col3 具有 3 个唯一值 (1,2,3) 那么我想传递每个组合分组到模型函数就像 col1=0 ,col2=1,col3=1 我将有一个数据框,我想将它传递给 ModelFunction 等等,用于三列的每个组合。

我试过了

df.groupBy("col1", "col2","col3").ModelFunction();

但它会引发错误。

.

感谢任何帮助。

【问题讨论】:

【参考方案1】:

简短的回答是你不能这样做。您只能在 RelationalGroupedDataset 上执行聚合函数(您编写为 UDAF 或内置在 org.apache.spark.sql.functions 中的函数)

在我看来,您有多种选择:

选项 1:每个唯一组合的数据量足够小,并且与其他组合相比没有太大的偏差。

在这种情况下,你可以这样做:

val grouped = df.groupBy("col1", "col2","col3").agg(collect_list(struct(all other columns)))
grouped.as[some case class to represent the data including the combination].map[your own logistic regression function).

选项 2:如果组合的总数足够小,您可以这样做:

val values: df.select("col1", "col2", "col3").distinct().collect()

然后循环遍历它们,通过过滤器从每个组合创建一个新的数据框。

选项 3:编写自己的 UDAF

这可能不够好,因为数据以流的形式出现而无法进行迭代,但是,如果您有一个匹配的逻辑回归实现,您可以尝试编写一个 UDAF 来执行此操作。例如:How to define and use a User-Defined Aggregate Function in Spark SQL?

【讨论】:

以上是关于如何将一组 RelationalGroupedDataset 传递给函数?的主要内容,如果未能解决你的问题,请参考以下文章

如何将一组叶标记保存到 geojson?

如何将一组 UIButtons 添加到 UIStackview 上?

如何将一组地图从 Firestore 映射到 recyclerView?

如何将一组矩形分组为连接区域的“岛”?

如何将一组 powershell 命令转换为可以在需要时运行的脚本?

如何从swift将一组浮点数组传递给C++函数