Spark使用DataFrame读取复杂JSON中的嵌套数组

Posted 玩大数据的snoopy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark使用DataFrame读取复杂JSON中的嵌套数组相关的知识,希望对你有一定的参考价值。

所周知,在早期Spark版本中就已经支持读取Json格式的数据文件,并能够直接转换为数据库表,以方便我们进行处理数据,在本篇文章中我们将介绍如何通过Spark API很简单地读取Json数据,并进一步讲解,读取复杂Json中的嵌套数组。

本次使用的各个组件版本设定如下:

Spark: 2.1.0

Scala 2.11.8

Hadoop 2.6.2


加载Json数据

我们可以简单地通过SQLContext读取Json文件

val dfSQLContext = sqlContext.read.json("/user/hadoop/flume/my_crawler_jd_report/*/*")


或者通过SparkSession读取Json文件

val dfSparkSession = spark.read.json("/user/hadoop/flume/my_crawler_jd_report/*/*")


以上两种方式读取的结果是一样的

scala> val dfSQLContext = sqlContext.read.json("/user/hadoop/flume/my_crawler_jd_report/*/*")
dfSQLContext: org.apache.spark.sql.DataFrame = [addTime: string, channel: string ... 3 more fields]
scala> val dfSparkSession = spark.read.json("/user/hadoop/flume/my_crawler_jd_report/*/*")
dfSparkSession: org.apache.spark.sql.DataFrame = [addTime: string, channel: string ... 3 more fields]


现在我们来看下整个Json的数据结构

scala> dfSparkSession.printSchema
root
|-- addTime: string
|-- channel: string
|-- consume_time: string
|-- message: struct
|    |-- addressInfo: array
|    |    |-- element: struct
|    |    |    |-- address: string
|    |    |    |-- addressLabel: string
|    |    |    |-- consignee: string
|    |    |    |-- email: string
|    |    |    |-- phoneNumber : string
|    |    |    |-- region: string
|    |-- certifyInfo: struct
|    |    ...结构比较复杂,此处省略
|-- sUserId: string (nullable = true)



嵌套数组结构

打平/分解数组

如果Json对象中包含了数组结构,我们应该如何访问到数组中的元素呢?有一种方法就是直接将这条数据中的数组打平成多条数据,也就是一条记录拆分成多条记录,在这里我们可以直接通过explode()函数实现

scala> val dfDetails = dfSQLContext.select(dfSQLContext("sUserId"),explode(dfSQLContext("message.addressInfo"))).toDF("userid","addressInfo")
scala> dfDetails.printSchema
root
|-- userid: string
|-- addressInfo: struct
|    |-- address: string
|    |-- addressLabel: string
|    |-- consignee: string
|    |-- email: string
|    |-- phoneNumber : string
|    |-- region: string
|    |-- telephoneNumber: string


现在我们已经将addressInfo打平成一条条记录,不再是数组结构的数据了。

scala> val dfDetailsAddress = dfDetails.select("addressInfo.address")
scala> dfDetailsAddress.show(10)
+--------------------+
|             address|
+--------------------+
|师大京东派|
|障岗村广东青年职业学院|
|永泰永康路致和街一巷之三号(永泰小学旁)|
|燕岭路89号燕侨大厦407室翰墨教育|
|广东财经大学华商学院-华商路1|
|蓬馨园西二栋203|
|障岗村广东青年职业职业学院|
|新兴街教育局对面梦韵床上用品专卖店|
|大园街十一巷九号一楼|
|玉亭镇百湖凤凰溪35|
+--------------------+
only showing top 10 rows



参考文献

explode()

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$@explode(e:org.apache.spark.sql.Column):org.apache.spark.sql.Column


生活

岂止于美


长按二维码关注


以上是关于Spark使用DataFrame读取复杂JSON中的嵌套数组的主要内容,如果未能解决你的问题,请参考以下文章

赵强老师在Spark SQL中读取JSON文件

从文件到Spark Dataframe读取Jsons数组

SparkSQL

pyspark建立RDD以及读取文件成dataframe

如何在 Spark Scala 中读取带有根元素的多行 json?

Spark SQL - 如何将 DataFrame 写入文本文件?