无法使用 Spark/Scala 从 JSON 嵌套键值对创建列和值

Posted

技术标签:

【中文标题】无法使用 Spark/Scala 从 JSON 嵌套键值对创建列和值【英文标题】:Unable to create the column and value from the JSON nested key Value Pair using Spark/Scala 【发布时间】:2019-09-25 18:12:57 【问题描述】:

我正在转换具有嵌套键值对的 JSON,以自动为键创建列并为值填充行。我不想创建模式,因为每个文件的列(键)数会有所不同。 我正在使用 Spark 2.3 版和 Scala 2.11.8 版。 我不是 Scala 专家,刚开始接触 Scala,因此感谢您为解决这个问题提供的意见。

这里是示例 JSON 格式

"RequestID":"9883a6d0-e002-4487-88a6-c92f6a504d72","OverallStatus":"OK","ele":["Name":"UUID","Value":"53f93df3-6528-4d42-a7f5-2876535d4982","Name":"id","Name":"opt_newsletter_email","Value":"boutmathieu@me.com","Name":"parm1","Value":"secure.snnow.ca/orders/summary","Name":"parm2","Value":"fromET","Name":"parm3","Value":"implied","Name":"parm4","Name":"subscribed","Value":"True","Name":"timestamp","Value":"8/6/2019 4:59:00 PM","Name":"list_id","Value":"6","Name":"name","Value":"Event Alerts","Name":"email","Value":"boutmathieu@me.com","Name":"newsletterID","Value":"sports:snnow:event","Name":"subscribeFormIdOrURL","Name":"unsubscribeTimestamp","Value":"8/14/2021 4:58:56 AM"]

"RequestID":"9883a6d0-e002-4487-88a6-c92f6a504d72","OverallStatus":"OK","ele":["Name":"UUID","Value":"53f93df3-6528-4d42-a7f5-2876535d4982","Name":"id","Name":"opt_newsletter_email","Value":"boutmathieu@me.com","Name":"parm1","Value":"secure.snnow.ca/orders/summary","Name":"parm2","Value":"fromET","Name":"parm3","Value":"implied","Name":"parm4","Name":"subscribed","Value":"True","Name":"timestamp","Value":"8/6/2019 4:59:00 PM","Name":"list_id","Value":"7","Name":"name","Value":"Partner & Sponsored Offers","Name":"email","Value":"boutmathieu@me.com","Name":"newsletterID","Value":"sports:snnow:affiliate","Name":"subscribeFormIdOrURL","Name":"unsubscribeTimestamp","Value":"8/14/2021 4:58:56 AM"]

预期输出 enter image description here

这是我的代码。

val newDF = spark.read.json("408d392-8c50-425a-a799-355f1783e0be-c000.json")
scala> newDF.printSchema
    root
     |-- OverallStatus: string (nullable = true)
     |-- RequestID: string (nullable = true)
     |-- ele: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- Name: string (nullable = true)
     |    |    |-- Value: string (nullable = true)

      val jsonDF = newDF.withColumn("colNames", explode($"ele")).select($"RequestID", ($"ColNames"))

scala> jsonDF.printSchema
    root
     |-- RequestID: string (nullable = true)
     |-- setting: struct (nullable = true)
     |    |-- Name: string (nullable = true)
     |    |-- Value: string (nullable = true)
       val finalDF=jsonDF.groupBy($"RequestID").pivot("ColNames.name").agg("ColNames.value")
---------------------------------------------------------------------------------------

I am getting this error while creating the finalDF

<console>:39: error: overloaded method value agg with alternatives:
  (expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame <and>
  (exprs: java.util.Map[String,String])org.apache.spark.sql.DataFrame <and>
  (exprs: scala.collection.immutable.Map[String,String])org.apache.spark.sql.DataFrame <and>
  (aggExpr: (String, String),aggExprs: (String, String)*)org.apache.spark.sql.DataFrame
 cannot be applied to (String)
          val finalDF=jsonDF.groupBy($"RequestID").pivot("ColNames.name").agg("ColNames.value")

任何帮助将不胜感激

【问题讨论】:

【参考方案1】:

你快到了,agg 函数可以通过以下方式声明:

agg(aggExpr: (String, String)) -> agg("age" -> "max")
agg(exprs: Map[String, String]) -> agg(Map("age" -> "max")
agg(expr: Column) -> agg(max($"age"))

而不是

agg("ColNames.value")

您应该使用上述示例之一。

例如

import org.apache.spark.sql.functions._

jsonDF.groupBy($"RequestID").pivot("ColNames.name")
   .agg(collect_list($"ColNames.value"))

【讨论】:

以上是关于无法使用 Spark/Scala 从 JSON 嵌套键值对创建列和值的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 spark/scala 将 json 字符串格式化为 MongoDB 文档样式?

Spark,Scala在从文件读取后无法正确创建视图

展平任何嵌套的 json 字符串并使用 spark scala 转换为数据帧

无法使用 spark scala 从数据集中的行中获取第一列的值

如何读取包含 JSON 文件的文件夹的目录:Spark Scala

无法从 spark scala 中的列名中删除空格