在 Spark 中独立分解多个列

Posted

技术标签:

【中文标题】在 Spark 中独立分解多个列【英文标题】:Independently explode multiple columns in Spark 【发布时间】:2019-05-15 19:14:29 【问题描述】:

我有一个架构,其中每一行包含多列数组,我想独立地分解每个数组列。

假设我们有列:

**userId    someString      varA     varB       someBool
   1        "example1"    [0,2,5]   [1,2,9]        true
   2        "example2"    [1,20,5]  [9,null,6]     false

我想要一个输出:

userId    someString      varA     varB   someBool
   1      "example1"       0        null    true
   1      "example1"       2        null    true
   1      "example1"       5        null    true
   1      "example1"       1        null    true
   1      "example1"       20       null    true
   1      "example1"       5        null    true
   2      "example2"       null      1      false
   2      "example2"       null      2      false
   2      "example2"       null      9      false
   2      "example2"       null      9      false
   2      "example2"       null     null    false
   2      "example2"       null      6      false

想法?

(哦,我正在尝试这样做,所以我不必在架构更改时更新代码,而且还因为 actual 架构有点大......)

PS - this 的道具非常相似但不同的问题,我无耻地窃取了示例数据。

编辑:@oliik 获胜,但是,用df.flatMap 来解决这个问题也很棒(主要是因为我仍然不了解flatMap

【问题讨论】:

我想知道这是否可以动态完成? 你如何得到例如这一行2 "example2" 1 null true?在源表someBool==false <=> userId==2 @ollik1 错字:P 【参考方案1】:

您始终可以通过编程方式生成选择

val df = Seq(
  (1, "example1", Seq(0,2,5), Seq(Some(1),Some(2),Some(9)), true),
  (2, "example2", Seq(1,20,5), Seq(Some(9),Option.empty[Int],Some(6)), false)
).toDF("userId", "someString", "varA", "varB", "someBool")

val arrayColumns = df.schema.fields.collect 
  case StructField(name, ArrayType(_, _), _, _) => name


val dfs = arrayColumns.map  expname =>
  val columns = df.schema.fields.map 
    case StructField(name, ArrayType(_, _), _, _) if expname == name => explode(df.col(name)) as name
    case StructField(name, ArrayType(_, _), _, _) => lit(null) as name
    case StructField(name, _, _, _) => df.col(name)
  
  df.select(columns:_*)


dfs.reduce(_ union _).show()
+------+----------+----+----+--------+
|userId|someString|varA|varB|someBool|
+------+----------+----+----+--------+
|     1|  example1|   0|null|    true|
|     1|  example1|   2|null|    true|
|     1|  example1|   5|null|    true|
|     2|  example2|   1|null|   false|
|     2|  example2|  20|null|   false|
|     2|  example2|   5|null|   false|
|     1|  example1|null|   1|    true|
|     1|  example1|null|   2|    true|
|     1|  example1|null|   9|    true|
|     2|  example2|null|   9|   false|
|     2|  example2|null|null|   false|
|     2|  example2|null|   6|   false|
+------+----------+----+----+--------+

【讨论】:

好的,超级酷。我可以让你解释一下 select 中发生了什么,这使得这项工作有效吗? 你能解释一下这个案例吗? 这个想法是通过数组列“循环”并将以下内容应用于所有列:1)如果是被循环的数组列,则爆炸列 2)如果是 null 值替换列该列是其他数组列 3) 如果该列不是数组列,请保持原样。这给出了select("userId", "someString", explode("varA"), lit(null), "someBool")select("userId", "someString", lit(null), explode("varB"), "someBool")。最后我们合并这些数据框 我明白了,但不是这个:ArrayType(, _), _, _) _, 本身。我要尝试,我非常喜欢。添加了我缺少的一个方面。干得好。 它用于捕获所有数组类型,而不考虑参数。 ArrayType 有两个参数,元素类型和包含 null。在这种情况下,我们总是希望匹配并因此忽略值​​pan>

以上是关于在 Spark 中独立分解多个列的主要内容,如果未能解决你的问题,请参考以下文章

Spark 独立模式多个 shell 会话(应用程序)

Spark 独立集群如何在工作节点上管理多个执行程序?

将 Spark 中的多个 ArrayType 列合并为一个 ArrayType 列

在 Spark SQL 中查找多个双数据类型列的中位数

Spark 从另一个表更新 Delta 中的多个列

使用 PySpark 将多个数字列拟合到 spark-ml 模型中