无法使用 Spark/Scala 从 JSON 嵌套键值对创建列和值
Posted
技术标签:
【中文标题】无法使用 Spark/Scala 从 JSON 嵌套键值对创建列和值【英文标题】:Unable to create the column and value from the JSON nested key Value Pair using Spark/Scala 【发布时间】:2019-09-25 18:12:57 【问题描述】:我正在转换具有嵌套键值对的 JSON,以自动为键创建列并为值填充行。我不想创建模式,因为每个文件的列(键)数会有所不同。 我正在使用 Spark 2.3 版和 Scala 2.11.8 版。 我不是 Scala 专家,刚开始接触 Scala,因此感谢您为解决这个问题提供的意见。
这里是示例 JSON 格式
"RequestID":"9883a6d0-e002-4487-88a6-c92f6a504d72","OverallStatus":"OK","ele":["Name":"UUID","Value":"53f93df3-6528-4d42-a7f5-2876535d4982","Name":"id","Name":"opt_newsletter_email","Value":"boutmathieu@me.com","Name":"parm1","Value":"secure.snnow.ca/orders/summary","Name":"parm2","Value":"fromET","Name":"parm3","Value":"implied","Name":"parm4","Name":"subscribed","Value":"True","Name":"timestamp","Value":"8/6/2019 4:59:00 PM","Name":"list_id","Value":"6","Name":"name","Value":"Event Alerts","Name":"email","Value":"boutmathieu@me.com","Name":"newsletterID","Value":"sports:snnow:event","Name":"subscribeFormIdOrURL","Name":"unsubscribeTimestamp","Value":"8/14/2021 4:58:56 AM"]
"RequestID":"9883a6d0-e002-4487-88a6-c92f6a504d72","OverallStatus":"OK","ele":["Name":"UUID","Value":"53f93df3-6528-4d42-a7f5-2876535d4982","Name":"id","Name":"opt_newsletter_email","Value":"boutmathieu@me.com","Name":"parm1","Value":"secure.snnow.ca/orders/summary","Name":"parm2","Value":"fromET","Name":"parm3","Value":"implied","Name":"parm4","Name":"subscribed","Value":"True","Name":"timestamp","Value":"8/6/2019 4:59:00 PM","Name":"list_id","Value":"7","Name":"name","Value":"Partner & Sponsored Offers","Name":"email","Value":"boutmathieu@me.com","Name":"newsletterID","Value":"sports:snnow:affiliate","Name":"subscribeFormIdOrURL","Name":"unsubscribeTimestamp","Value":"8/14/2021 4:58:56 AM"]
预期输出 enter image description here
这是我的代码。
val newDF = spark.read.json("408d392-8c50-425a-a799-355f1783e0be-c000.json")
scala> newDF.printSchema
root
|-- OverallStatus: string (nullable = true)
|-- RequestID: string (nullable = true)
|-- ele: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Name: string (nullable = true)
| | |-- Value: string (nullable = true)
val jsonDF = newDF.withColumn("colNames", explode($"ele")).select($"RequestID", ($"ColNames"))
scala> jsonDF.printSchema
root
|-- RequestID: string (nullable = true)
|-- setting: struct (nullable = true)
| |-- Name: string (nullable = true)
| |-- Value: string (nullable = true)
val finalDF=jsonDF.groupBy($"RequestID").pivot("ColNames.name").agg("ColNames.value")
---------------------------------------------------------------------------------------
I am getting this error while creating the finalDF
<console>:39: error: overloaded method value agg with alternatives:
(expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame <and>
(exprs: java.util.Map[String,String])org.apache.spark.sql.DataFrame <and>
(exprs: scala.collection.immutable.Map[String,String])org.apache.spark.sql.DataFrame <and>
(aggExpr: (String, String),aggExprs: (String, String)*)org.apache.spark.sql.DataFrame
cannot be applied to (String)
val finalDF=jsonDF.groupBy($"RequestID").pivot("ColNames.name").agg("ColNames.value")
任何帮助将不胜感激
【问题讨论】:
【参考方案1】:你快到了,agg
函数可以通过以下方式声明:
agg(aggExpr: (String, String)) -> agg("age" -> "max")
agg(exprs: Map[String, String]) -> agg(Map("age" -> "max")
agg(expr: Column) -> agg(max($"age"))
而不是
agg("ColNames.value")
您应该使用上述示例之一。
例如
import org.apache.spark.sql.functions._
jsonDF.groupBy($"RequestID").pivot("ColNames.name")
.agg(collect_list($"ColNames.value"))
【讨论】:
以上是关于无法使用 Spark/Scala 从 JSON 嵌套键值对创建列和值的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 spark/scala 将 json 字符串格式化为 MongoDB 文档样式?
展平任何嵌套的 json 字符串并使用 spark scala 转换为数据帧
无法使用 spark scala 从数据集中的行中获取第一列的值