在 Scala 中使用来自另一个没有数组列的 DataFrame 的数组类型列创建 Spark DataFrame 的有效方法是啥?

Posted

技术标签:

【中文标题】在 Scala 中使用来自另一个没有数组列的 DataFrame 的数组类型列创建 Spark DataFrame 的有效方法是啥?【英文标题】:What is the efficient way to create Spark DataFrame in Scala with array type columns from another DataFrame that does not have an array column?在 Scala 中使用来自另一个没有数组列的 DataFrame 的数组类型列创建 Spark DataFrame 的有效方法是什么? 【发布时间】:2019-11-17 20:10:40 【问题描述】:

假设,我有以下数据框:

 id | col1 | col2 
-----------------
 x  |  p1  |  a1  
-----------------
 x  |  p2  |  b1
-----------------
 y  |  p2  |  b2
-----------------
 y  |  p2  |  b3
-----------------
 y  |  p3  |  c1

col1 中的不同值(p1,p2,p3)单独与 id 将用作最终数据帧的列。这里,id y 有两个 col2 值(b2 和 b3)对应相同的 col1p2,所以, p2 将被视为数组类型的列。 因此,最终的数据帧将是

  id  |   p1   |   p2   |   p3
--------------------------------
  x   |   a1   |  [b1]  |  null
--------------------------------
  y   |  null  |[b2, b3]|  c1

如何从第一个数据帧高效地实现第二个数据帧?

【问题讨论】:

【参考方案1】:

你基本上是在找表pivoting;对于您的情况,groupByidpivotcol1 作为标题,并使用 collect_list 函数将 col2 聚合为列表:

df.groupBy("id").pivot("col1").agg(collect_list("col2")).show
+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x|[a1]|    [b1]|  []|
|  y|  []|[b2, b3]|[c1]|
+---+----+--------+----+

如果保证p1p3中每个id最多有一个值,则可以通过getting数组的第一项将这些列转换为字符串类型:

df.groupBy("id").pivot("col1").agg(collect_list("col2"))
  .withColumn("p1", $"p1"(0)).withColumn("p3", $"p3"(0))
  .show
+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x|  a1|    [b1]|null|
|  y|null|[b2, b3]|  c1|
+---+----+--------+----+

如果您需要动态转换列类型,即仅在必须时使用数组类型的列类型:

// get array Type columns
val arrayColumns = df.groupBy("id", "col1").agg(count("*").as("N"))
    .where($"N" > 1).select("col1").distinct.collect.map(row => row.getString(0))
// arrayColumns: Array[String] = Array(p2)

// aggregate / pivot data frame
val aggDf = df.groupBy("id").pivot("col1").agg(collect_list("col2"))
// aggDf: org.apache.spark.sql.DataFrame = [id: string, p1: array<string> ... 2 more fields]

// get string columns
val stringColumns = aggDf.columns.filter(x => x != "id" && !arrayColumns.contains(x))

// use foldLeft on string columns to convert the columns to string type
stringColumns.foldLeft(aggDf)((df, x) => df.withColumn(x, col(x)(0))).show
+---+----+--------+----+
| id|  p1|      p2|  p3|
+---+----+--------+----+
|  x|  a1|    [b1]|null|
|  y|null|[b2, b3]|  c1|
+---+----+--------+----+

【讨论】:

我希望 p1 和 p3 是字符串类型,而不是数组类型。 我不确定这是否有意义,除非您确定,p1p3 中最多有一个值,每个 id。在这种情况下,您可以从p1p3 中提取第一个元素。 df.groupBy("id").pivot("col1").agg(collect_list("col2")).withColumn("p1", $"p1"(0)).withColumn("p3", $"p3"(0)).show 应该提供你所需要的。 问题是来自 col1 的不同值不是固定的,它们可以是任意数量的元素。 在这种情况下,我会将所有透视列保留为数组类型而不是字符串类型。归根结底,如果您甚至不确定您有多少列,您怎么知道它们应该是什么类型? 我可以使用 SELECT DISTINCT col1 FROM (SELECT id, col1, COUNT (*) AS rowCount FROM df GROUP BY id , col1 HAVING 从第一个数据帧中找到数组类型列的名称rowCount > 1。主要问题是使用 Array Type 列的名称动态创建最终数据帧。

以上是关于在 Scala 中使用来自另一个没有数组列的 DataFrame 的数组类型列创建 Spark DataFrame 的有效方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

Presto 数组包含来自另一列的值(超集 SQL 查询)

将一个数据帧的数组列与scala中另一个数据帧的数组列的子集进行比较

使用来自另一个散列的值作为散列名称检查散列中是不是存在键

如何在scala中的另一个数组中使用一个数组,以便使用一个数组中的每个元素附加到另一个数组中的相应元素?

如何使用惯用的Scala替换(填充)来自另一个列表的选项列表中的None条目?

如何使用来自另一个表列的值优化列的更新?