spark Group By数据框列没有聚合[重复]
Posted
技术标签:
【中文标题】spark Group By数据框列没有聚合[重复]【英文标题】:spark Group By data-frame columns without aggregation [duplicate] 【发布时间】:2018-06-24 00:00:34 【问题描述】:我在 hdfs 中有一个 csv 文件:/hdfs/test.csv,我喜欢使用 spark 和 scala 对以下数据进行分组,我需要这样的输出。
我想根据 A1 列按 A1...AN 列分组,输出应该是这样的
所有的行都应该像下面这样分组。 输出:
JACK , ABCD, ARRAY("0,1,0,1", "2,9,2,9")
JACK , LMN, ARRAY("0,1,0,3", "0,4,3,T")
JACK, HBC, ARRAY("1,T,5,21", "E7,4W,5,8)
输入:
++++++++++++++++++++++++++++++
name A1 A1 A2 A3..AN
--------------------------------
JACK ABCD 0 1 0 1
JACK LMN 0 1 0 3
JACK ABCD 2 9 2 9
JAC HBC 1 T 5 21
JACK LMN 0 4 3 T
JACK HBC E7 4W 5 8
我需要 spark scala 中的以下输出
JACK , ABCD, ARRAY("0,1,0,1", "2,9,2,9")
JACK , LMN, ARRAY("0,1,0,3", "0,4,3,T")
JACK, HBC, ARRAY("1,T,5,21", "E7,4W,5,8)
【问题讨论】:
使用 distinct() 代替。 【参考方案1】:您可以通过将列作为数组来实现这一点。
import org.apache.spark.sql.functions.collect_set, concat_ws, array, col
val aCols = 1.to(250).map( x -> col(s"A$x"))
val concatCol = concat_ws(",", array(aCols : _*))
groupedDf = df.withColumn("aConcat", concatCol).
groupBy("name", "A").
agg(collect_set("aConcat"))
如果您对重复项没问题,您也可以使用 collect_list 而不是 collect_set。
【讨论】:
嗨 Ayplam,谢谢,我该如何使用它? val arrayofcolumns = DataFrame.columns 在你的代码中,对我来说,A 可能是 B、C、D、E、.. 或其他东西 我试过 val arrayColumns = DataFrame.columns,为我工作,谢谢你的帮助 太棒了!如果您可以标记为答案,将不胜感激。【参考方案2】:您的输入有两个不同的列,称为A1
。我将假设groupBy
类别称为A
,而要放入最终数组的元素是A1
。
如果将数据加载到 DataFrame 中,则可以这样做以实现指定的输出:
import org.apache.spark.sql.functions.collect_set, concat_ws
val grouped = someDF
.groupBy($"name", $"A")
.agg(collect_set(concat_ws(",", $"A1", $"A2", $"A3", $"A4")).alias("grouped"))
【讨论】:
但是这里 A1 TO .....An。列从 A1 到 An 这意味着如果我的列是 A1......到 A250 对我来说将很困难以上是关于spark Group By数据框列没有聚合[重复]的主要内容,如果未能解决你的问题,请参考以下文章
查询没有重复和聚合函数或 GROUP BY 子句问题。 - 重复
Spark Window 聚合与 Group By/Join 性能
Spark SQL 可以在 GROUP BY 聚合中使用 FIRST_VALUE 和 LAST_VALUE(但这不是标准的)
sqlserver中分区函数 partition by与 group by 区别 删除关键字段重复列
org.apache.spark.sql.AnalysisException:表达式 't2.`sum_click_passed`' 既不在 group by 中,也不是聚合函数