将带有 JSON 对象数组的 Spark 数据框列转换为多行
Posted
技术标签:
【中文标题】将带有 JSON 对象数组的 Spark 数据框列转换为多行【英文标题】:Convert an Spark dataframe columns with an array of JSON objects to multiple rows 【发布时间】:2018-10-31 19:12:31 【问题描述】:我有一个流式JSON数据,其结构可以用下面的案例类来描述
case class Hello(A: String, B: Array[Map[String, String]])
相同的样本数据如下
| A | B |
|-------|------------------------------------------|
| ABC | [C:1, D:1, C:2, D:4] |
| XYZ | [C:3, D :6, C:9, D:11, C:5, D:12] |
我想把它改成
| A | C | D |
|-------|-----|------|
| ABC | 1 | 1 |
| ABC | 2 | 4 |
| XYZ | 3 | 6 |
| XYZ | 9 | 11 |
| XYZ | 5 | 12 |
任何帮助将不胜感激。
【问题讨论】:
【参考方案1】:随着问题的演变,我将原始答案留在那里,这解决了最后一个问题。
重要的一点,下面提到的输入现在可以满足:
val df0 = Seq (
("ABC", List(Map("C" -> "1", "D" -> "2"), Map("C" -> "3", "D" -> "4"))),
("XYZ", List(Map("C" -> "44", "D" -> "55"), Map("C" -> "188", "D" -> "199"), Map("C" -> "88", "D" -> "99")))
)
.toDF("A", "B")
也可以这样做,不过接下来需要为此修改脚本,虽然微不足道:
val df0 = Seq (
("ABC", List(Map("C" -> "1", "D" -> "2"))),
("ABC", List(Map("C" -> "44", "D" -> "55"))),
("XYZ", List(Map("C" -> "11", "D" -> "22")))
)
.toDF("A", "B")
然后按照请求的格式:
val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")
val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum")
val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")
val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))
val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C"))
val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")
df6.show(false)
返回:
+---+---+---+
|C |D |A |
+---+---+---+
|3 |4 |ABC|
|1 |2 |ABC|
|88 |99 |XYZ|
|188|199|XYZ|
|44 |55 |XYZ|
+---+---+---+
您可以对列重新排序。
【讨论】:
@thebluephanton 谢谢你完美的工作。再次感谢您的耐心等待。【参考方案2】:不确定是否是最佳方法,但可以分两步完成。将您的案例类放在一边,以下内容:
import org.apache.spark.sql.functions._
//case class ComponentPlacement(A: String, B: Array[Map[String, String]])
val df = Seq (
("ABC", List(Map("C" -> "1", "D" -> "2"))),
("XYZ", List(Map("C" -> "11", "D" -> "22")))
).toDF("A", "B")
val df2 = df.select($"A", explode($"B")).toDF("A", "Bn")
val df3 = df2.select($"A", explode($"Bn")).toDF("A", "B", "C")
val df4 = df3.select($"A", $"B", $"C").groupBy("A").pivot("B").agg(first($"C"))
返回:
+---+---+---+
| A| C| D|
+---+---+---+
|XYZ| 11| 22|
|ABC| 1| 2|
+---+---+---+
【讨论】:
以上是关于将带有 JSON 对象数组的 Spark 数据框列转换为多行的主要内容,如果未能解决你的问题,请参考以下文章
读取带有模式的 JSON 数组字符串返回 null spark 2.2.0