在 Scala 中使用来自另一个没有数组列的 DataFrame 的数组类型列创建 Spark DataFrame 的有效方法是啥?
Posted
技术标签:
【中文标题】在 Scala 中使用来自另一个没有数组列的 DataFrame 的数组类型列创建 Spark DataFrame 的有效方法是啥?【英文标题】:What is the efficient way to create Spark DataFrame in Scala with array type columns from another DataFrame that does not have an array column?在 Scala 中使用来自另一个没有数组列的 DataFrame 的数组类型列创建 Spark DataFrame 的有效方法是什么? 【发布时间】:2019-11-17 20:10:40 【问题描述】:假设,我有以下数据框:
id | col1 | col2
-----------------
x | p1 | a1
-----------------
x | p2 | b1
-----------------
y | p2 | b2
-----------------
y | p2 | b3
-----------------
y | p3 | c1
col1 中的不同值(p1,p2,p3)单独与 id 将用作最终数据帧的列。这里,id y 有两个 col2 值(b2 和 b3)对应相同的 col1 值 p2,所以, p2 将被视为数组类型的列。 因此,最终的数据帧将是
id | p1 | p2 | p3
--------------------------------
x | a1 | [b1] | null
--------------------------------
y | null |[b2, b3]| c1
如何从第一个数据帧高效地实现第二个数据帧?
【问题讨论】:
【参考方案1】:你基本上是在找表pivoting;对于您的情况,groupBy
id
、pivot
col1
作为标题,并使用 collect_list
函数将 col2
聚合为列表:
df.groupBy("id").pivot("col1").agg(collect_list("col2")).show
+---+----+--------+----+
| id| p1| p2| p3|
+---+----+--------+----+
| x|[a1]| [b1]| []|
| y| []|[b2, b3]|[c1]|
+---+----+--------+----+
如果保证p1
和p3
中每个id
最多有一个值,则可以通过getting
数组的第一项将这些列转换为字符串类型:
df.groupBy("id").pivot("col1").agg(collect_list("col2"))
.withColumn("p1", $"p1"(0)).withColumn("p3", $"p3"(0))
.show
+---+----+--------+----+
| id| p1| p2| p3|
+---+----+--------+----+
| x| a1| [b1]|null|
| y|null|[b2, b3]| c1|
+---+----+--------+----+
如果您需要动态转换列类型,即仅在必须时使用数组类型的列类型:
// get array Type columns
val arrayColumns = df.groupBy("id", "col1").agg(count("*").as("N"))
.where($"N" > 1).select("col1").distinct.collect.map(row => row.getString(0))
// arrayColumns: Array[String] = Array(p2)
// aggregate / pivot data frame
val aggDf = df.groupBy("id").pivot("col1").agg(collect_list("col2"))
// aggDf: org.apache.spark.sql.DataFrame = [id: string, p1: array<string> ... 2 more fields]
// get string columns
val stringColumns = aggDf.columns.filter(x => x != "id" && !arrayColumns.contains(x))
// use foldLeft on string columns to convert the columns to string type
stringColumns.foldLeft(aggDf)((df, x) => df.withColumn(x, col(x)(0))).show
+---+----+--------+----+
| id| p1| p2| p3|
+---+----+--------+----+
| x| a1| [b1]|null|
| y|null|[b2, b3]| c1|
+---+----+--------+----+
【讨论】:
我希望 p1 和 p3 是字符串类型,而不是数组类型。 我不确定这是否有意义,除非您确定,p1
和 p3
中最多有一个值,每个 id
。在这种情况下,您可以从p1
和p3
中提取第一个元素。 df.groupBy("id").pivot("col1").agg(collect_list("col2")).withColumn("p1", $"p1"(0)).withColumn("p3", $"p3"(0)).show
应该提供你所需要的。
问题是来自 col1 的不同值不是固定的,它们可以是任意数量的元素。
在这种情况下,我会将所有透视列保留为数组类型而不是字符串类型。归根结底,如果您甚至不确定您有多少列,您怎么知道它们应该是什么类型?
我可以使用 SELECT DISTINCT col1 FROM (SELECT id, col1, COUNT (*) AS rowCount FROM df GROUP BY id , col1 HAVING 从第一个数据帧中找到数组类型列的名称rowCount > 1。主要问题是使用 Array Type 列的名称动态创建最终数据帧。以上是关于在 Scala 中使用来自另一个没有数组列的 DataFrame 的数组类型列创建 Spark DataFrame 的有效方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章
将一个数据帧的数组列与scala中另一个数据帧的数组列的子集进行比较
如何在scala中的另一个数组中使用一个数组,以便使用一个数组中的每个元素附加到另一个数组中的相应元素?