如何使用 Scala 在 Spark 中爆炸嵌套结构

Posted

技术标签:

【中文标题】如何使用 Scala 在 Spark 中爆炸嵌套结构【英文标题】:How do I explode a nested Struct in Spark using Scala 【发布时间】:2019-08-26 18:13:44 【问题描述】:

我正在使用

创建一个数据框
  val snDump = table_raw
    .applyMapping(mappings = Seq(
      ("event_id", "string", "eventid", "string"),
      ("lot-number", "string", "lotnumber", "string"),
      ("serial-number", "string", "serialnumber", "string"),
      ("event-time", "bigint", "eventtime", "bigint"),
      ("companyid", "string", "companyid", "string")),
      caseSensitive = false, transformationContext = "sn")
    .toDF()
    .groupBy(col("eventid"), col("lotnumber"), col("companyid"))
    .agg(collect_list(struct("serialnumber", "eventtime")).alias("snetlist"))
    .createOrReplaceTempView("sn")

我在 df 中有这样的数据

    eventid | lotnumber | companyid | snetlist
    123     | 4q22      | tu56ff    | [[12345,67438]]
    456     | 4q22      | tu56ff    | [[12346,67434]]
    258     | 4q22      | tu56ff    | [[12347,67455], [12333,67455]]
    999     | 4q22      | tu56ff    | [[12348,67459]]

我想将其分解为我表中的 2 列中的数据,因为我正在做的是

    val serialNumberEvents = snDump.select(col("eventid"), col("lotnumber"), explode(col("snetlist")).alias("serialN"), explode(col("snetlist")).alias("eventT"), col("companyid"))

也试过了

    val serialNumberEvents = snDump.select(col("eventid"), col("lotnumber"), col($"snetlist.serialnumber").alias("serialN"), col($"snetlist.eventtime").alias("eventT"), col("companyid"))

但事实证明,explode 只能使用一次,并且我在选择中遇到错误,所以我如何使用explode/或其他东西来实现我想要的。

    eventid | lotnumber | companyid | serialN  | eventT |
    123     | 4q22      | tu56ff    | 12345    | 67438  |
    456     | 4q22      | tu56ff    | 12346    | 67434  |
    258     | 4q22      | tu56ff    | 12347    | 67455  |
    258     | 4q22      | tu56ff    | 12333    | 67455  |
    999     | 4q22      | tu56ff    | 12348    | 67459  |

我查看了很多 *** 线程,但没有一个对我有帮助。可能已经回答了这样的问题,但是我对 scala 的理解非常少,这可能使我不理解答案。如果这是重复的,那么有人可以指导我找到正确的答案。任何帮助表示赞赏。

【问题讨论】:

你可以爆炸两次。 Only one generator allowed per select 是我爆炸两次时遇到的错误 【参考方案1】:

首先,将数组分解成一个临时结构列,然后解包:

val serialNumberEvents = snDump
  .withColumn("tmp",explode((col("snetlist"))))
  .select(
    col("eventid"),
    col("lotnumber"),
    col("companyid"),
    // unpack struct
    col("tmp.serialnumber").as("serialN"),
    col("tmp.eventtime").as("serialT")
  )

【讨论】:

我注意到打开包装的要点。你能详细说明吗?你是说getItem(0)之类的吗? @thebluephantom 是的,但我认为getItem(0) 等仅适用于数组,在结构中您可以按名称访问项目【参考方案2】:

诀窍是将要分解的列打包到一个数组(或结构)中,在数组上使用explode,然后解包它们。

val col_names = Seq("eventid", "lotnumber", "companyid", "snetlist")
val data = Seq(
    (123, "4q22", "tu56ff", Seq(Seq(12345,67438))),
    (456, "4q22", "tu56ff", Seq(Seq(12346,67434))),
    (258, "4q22", "tu56ff", Seq(Seq(12347,67455), Seq(12333,67455))),
    (999, "4q22", "tu56ff", Seq(Seq(12348,67459)))
    )

val snDump = spark.createDataFrame(data).toDF(col_names: _*)

val serialNumberEvents = snDump.select(col("eventid"), col("lotnumber"), explode(col("snetlist")).alias("snetlist"), col("companyid"))

val exploded = serialNumberEvents.select($"eventid", $"lotnumber", $"snetlist".getItem(0).alias("serialN"), $"snetlist".getItem(1).alias("eventT"), $"companyid")
exploded.show()

请注意,我的 snetlist 具有架构 Array(Array) 而不是 Array(Struct)。您也可以通过从列中创建一个数组而不是一个结构来简单地实现这一点

【讨论】:

【参考方案3】:

另一种方法,如果需要爆炸两次,如下 - 另一个例子,但为了说明这一点:

val flattened2 = df.select($"director", explode($"films.actors").as("actors_flat"))
val flattened3 = flattened2.select($"director", explode($"actors_flat").as("actors_flattened"))

请参阅Is there an efficient way to join two large Datasets with (deeper) nested array field? 了解略有不同的上下文,但适用相同的方法。

这个回答是为了回应你的断言你只能爆炸一次。

【讨论】:

以上是关于如何使用 Scala 在 Spark 中爆炸嵌套结构的主要内容,如果未能解决你的问题,请参考以下文章

Spark (Scala) - 在 DataFrame 中恢复爆炸

使用具有相同名称的嵌套子属性展平 Spark JSON 数据框

数据框 Spark scala 爆炸 json 数组

如何检查列数据 Spark scala 上的 isEmpty

带有列列表的 Spark 选择 Scala

如何在Scala中证明爆炸原理(ex falso sequitur quodlibet)?