如何在没有 SparkSQL 的情况下使用 fastxml 解析 Spark 中的 JSON?

Posted

技术标签:

【中文标题】如何在没有 SparkSQL 的情况下使用 fastxml 解析 Spark 中的 JSON?【英文标题】:How to parse JSON in Spark with fasterxml without SparkSQL? 【发布时间】:2016-05-19 12:18:40 【问题描述】:

我已经走到这一步了:

import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature

case class Person(name: String, lovesPandas: Boolean)

val mapper = new ObjectMapper()

val input = sc.textFile("files/pandainfo.json")
val result = input.flatMap(record => 
    try
        Some(mapper.readValue(record, classOf[Person]))
     catch 
        case e: Exception => None
    
)
result.collect

但得到Array() 结果(没有错误)。文件是https://github.com/databricks/learning-spark/blob/master/files/pandainfo.json我怎么从这里继续?


咨询Spark: broadcasting jackson ObjectMapper我试过了

import org.apache.spark._
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature

case class Person(name: String, lovesPandas: Boolean)

val input = """"name":"Sparky The Bear", "lovesPandas":true"""
val result = input.flatMap(record => 
    try
        val mapper = new ObjectMapper()
        mapper.registerModule(DefaultScalaModule)
        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
        Some(mapper.readValue(record, classOf[Person]))
     catch 
        case e: Exception => None
    
)
result.collect

得到了

Name: Compile Error
Message: <console>:34: error: overloaded method value readValue with alternatives:
  [T](x$1: Array[Byte], x$2: com.fasterxml.jackson.databind.JavaType)T <and>
  [T](x$1: Array[Byte], x$2: com.fasterxml.jackson.core.type.TypeReference[_])T <and>
  [T](x$1: Array[Byte], x$2: Class[T])T <and>

【问题讨论】:

我只是在谷歌上搜索,但你需要mapper.registerModule(DefaultScalaModule)吗?另外,您是否尝试过从 Spark 之外的文字字符串中解析 Person 只是为了检查该位是否正常工作? @TheArchetypalPaul 这可以通过很少的谷歌帮助和一些调试来解决.. @YuvalItzchakov,嗯,是的,但我不知道你为什么要对我发表评论! @TheArchetypalPaul:1)如果我添加这个,我会得到Name: org.apache.spark.SparkException Message: Task not serializable 2)我如何添加文字字符串? val text = new String('"name":"Sparky The Bear", "lovesPandas":true')Message: &lt;console&gt;:1: error: unclosed character literal 要在带引号的字符串中包含引号,请阅读 Scala 的任何基本介绍/教程。确实,从这个问题和其他最近的问题来看,你没有花很短的时间来查找一些 Scala 基础知识,从而使你的任务变得更加困难。从简单的东西开始——让 JSON 读取工作,然后添加 Spark 部分。 【参考方案1】:

我看到您尝试了 Learning Spark 示例。 这里参考完整代码 https://github.com/holdenk/learning-spark-examples/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/BasicParseJsonWithJackson.scala E.

【讨论】:

这在 jupyter notebook 中不起作用。 Jupyter 不知道如何导入 jackson 类。有什么想法吗? 你能解释清楚吗? 您指向我的代码在 jupyter notebook 中不起作用。我开始怀疑这不是因为代码错误,而是因为笔记本。可能学习 spark 的代码和 notebook 配合不好。 也许您需要更改笔记本的元数据添加杰克逊包。【参考方案2】:

代替sc.textfile("path\to\json")你可以试试这个(我用java写因为我不知道scala但API是一样的):

SQLContext sqlContext = new SQLContext(sc);
DataFrame dfFromJson = sqlContext.read().json("path\to\json\file.json");

spark 将读取您的 json 文件并将其转换为数据帧。

如果你的 json 文件是嵌套的,你可以使用

org.apache.spark.sql.functions.explode(e: Column): Column

比如看我的回答here

希望对您有所帮助。

【讨论】:

以上是关于如何在没有 SparkSQL 的情况下使用 fastxml 解析 Spark 中的 JSON?的主要内容,如果未能解决你的问题,请参考以下文章

如何在没有 Only 的情况下生成 spark sql 截断查询

可以在没有运行任何 Map/Reduce (/Yarn) 的情况下对 Hive 表执行 Spark SQL 吗?

Spark SQL - 如何将 DataFrame 写入文本文件?

如何使用 PySpark、SparkSQL 和 Cassandra?

如何在不使用数据框的情况下将一行分解为多行?

如何使用 SparkSQL 过滤百分位数的输入值?