使用scala在Spark中转置DataFrame而不进行聚合
Posted
技术标签:
【中文标题】使用scala在Spark中转置DataFrame而不进行聚合【英文标题】:Transpose DataFrame Without Aggregation in Spark with scala 【发布时间】:2018-03-20 19:30:22 【问题描述】:我在网上查看了许多不同的解决方案,但数数没有找到我想要解决的问题。 请帮我解决这个问题。
我正在使用带有 Scala 的 Apache Spark 2.1.0。以下是我的数据框:
+-----------+-------+
|COLUMN_NAME| VALUE |
+-----------+-------+
|col1 | val1 |
|col2 | val2 |
|col3 | val3 |
|col4 | val4 |
|col5 | val5 |
+-----------+-------+
我希望将其转置为,如下所示:
+-----+-------+-----+------+-----+
|col1 | col2 |col3 | col4 |col5 |
+-----+-------+-----+------+-----+
|val1 | val2 |val3 | val4 |val5 |
+-----+-------+-----+------+-----+
【问题讨论】:
如果两条记录具有相同的COLUMN_NAME
但不同的VALUE
怎么办?那应该是什么价值呢?如果你知道没有这样的重复,你的数据框要么非常小(在这种情况下,你可以收集它并使用普通的 Scala 进行转换)或者结果会有太多的列。
两条记录永远不会有相同的列名。事实上,我在多行中获得了表插入/更新详细信息,一列带有 columnn-names,另一列带有值,我的计划是将它们转换为数据框并将它们直接更新到 Kudu 数据库中。第一列值作为模式出现,并发送作为值的列值。所以我需要用它来构建数据框。如果您有任何其他建议/想法,请告诉我。
【参考方案1】:
您可以使用pivot
执行此操作,但您仍然需要聚合,但是如果您有多个value
用于COLUMN_NAME
怎么办?
val df = Seq(
("col1", "val1"),
("col2", "val2"),
("col3", "val3"),
("col4", "val4"),
("col5", "val5")
).toDF("COLUMN_NAME", "VALUE")
df
.groupBy()
.pivot("COLUMN_NAME").agg(first("VALUE"))
.show()
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
编辑:
如果您的数据框真的像您的示例中那样小,您可以将其收集为Map
:
val map = df.as[(String,String)].collect().toMap
然后申请this answer
【讨论】:
非常感谢您的快速回复!非常感激 !有效 :) 。唯一的问题是因为枢轴,它很慢。 嘿 Raphael,我知道我们也可以使用 Map 来做到这一点,但我无法获得结果。如果您对 Map 有逻辑,请分享。 @MarutiK 只需在您的地图上拨打toSeq
,然后应用我的答案
我可以做 toSeq ,之后它会因 groupBy() 为空值而失败。我收到错误为“ 如果您的数据框像问题中一样小,那么您可以收集 COLUMN_NAME 以形成架构并收集 VALUE 以形成行然后创建一个新的数据框作为
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
//creating schema from existing dataframe
val schema = StructType(df.select(collect_list("COLUMN_NAME")).first().getAs[Seq[String]](0).map(x => StructField(x, StringType)))
//creating RDD[Row]
val values = sc.parallelize(Seq(Row.fromSeq(df.select(collect_list("VALUE")).first().getAs[Seq[String]](0))))
//new dataframe creation
sqlContext.createDataFrame(values, schema).show(false)
这应该给你
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
【讨论】:
你太棒了拉梅什!这正是我所需要的。非常感谢你的帮助。在性能方面,这比 Pivot 更好。 很高兴听到@MarutiK,当你有资格时不要忘记投票;)【参考方案3】:使用交叉表的另一种解决方案虽然冗长。
val dfp = spark.sql(""" with t1 (
select 'col1' c1, 'val1' c2 union all
select 'col2' c1, 'val2' c2 union all
select 'col3' c1, 'val3' c2 union all
select 'col4' c1, 'val4' c2 union all
select 'col5' c1, 'val5' c2
) select c1 COLUMN_NAME, c2 VALUE from t1
""")
dfp.show(50,false)
+-----------+-----+
|COLUMN_NAME|VALUE|
+-----------+-----+
|col1 |val1 |
|col2 |val2 |
|col3 |val3 |
|col4 |val4 |
|col5 |val5 |
+-----------+-----+
val dfp2=dfp.groupBy("column_name").agg( first($"value") as "value" ).stat.crosstab("value", "column_name")
dfp2.show(false)
+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1 |1 |0 |0 |0 |0 |
|val3 |0 |0 |1 |0 |0 |
|val2 |0 |1 |0 |0 |0 |
|val5 |0 |0 |0 |0 |1 |
|val4 |0 |0 |0 |1 |0 |
+-----------------+----+----+----+----+----+
val needed_cols = dfp2.columns.drop(1)
needed_cols: Array[String] = Array(col1, col2, col3, col4, col5)
val dfp3 = needed_cols.foldLeft(dfp2) (acc,x) => acc.withColumn(x,expr(s"case when $x=1 then value_column_name else 0 end"))
dfp3.show(false)
+-----------------+----+----+----+----+----+
|value_column_name|col1|col2|col3|col4|col5|
+-----------------+----+----+----+----+----+
|val1 |val1|0 |0 |0 |0 |
|val3 |0 |0 |val3|0 |0 |
|val2 |0 |val2|0 |0 |0 |
|val5 |0 |0 |0 |0 |val5|
|val4 |0 |0 |0 |val4|0 |
+-----------------+----+----+----+----+----+
dfp3.select( needed_cols.map( c => max(col(c)).as(c)) :_* ).show
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
【讨论】:
【参考方案4】:要增强 Ramesh Maharjan 的答案,请收集并将其转换为地图。
val mp = df.as[(String,String)].collect.toMap
使用虚拟数据框,我们可以使用 foldLeft 进一步构建
val f = Seq("1").toDF("dummy")
mp.keys.toList.sorted.foldLeft(f) (acc,x) => acc.withColumn(mp(x),lit(x) ) .drop("dummy").show(false)
+----+----+----+----+----+
|val1|val2|val3|val4|val5|
+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
【讨论】:
以上是关于使用scala在Spark中转置DataFrame而不进行聚合的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 python 在 Spark 中转置 DataFrame 而不进行聚合
在 Pandas DataFrame 中转置选定的 MultiIndex 级别
在 Spark 上使用 Scala 在 Dataframe 中拆分字符串
在 Scala 中使用来自另一个没有数组列的 DataFrame 的数组类型列创建 Spark DataFrame 的有效方法是啥?