如何在 spark dataframes/spark sql 中使用模式读取 json

Posted

技术标签:

【中文标题】如何在 spark dataframes/spark sql 中使用模式读取 json【英文标题】:how to read json with schema in spark dataframes/spark sql 【发布时间】:2016-09-06 17:58:49 【问题描述】:

sql/数据帧, 请帮助我或提供一些关于如何阅读此 json 的好建议


    "billdate":"2016-08-08',
    "accountid":"xxx"
    "accountdetails":
        "total":"1.1"
        "category":[
        
            "desc":"one",
            "currentinfo":
            "value":"10"
        ,
            "subcategory":[
            
                "categoryDesc":"sub",
                "value":"10",
                "currentinfo":
                    "value":"10"
                
            ]
        ]
    

谢谢,

【问题讨论】:

【参考方案1】:

您可以尝试以下代码在 Spark 2.2 中读取基于 Schema 的 JSON 文件

import org.apache.spark.sql.types.DataType, StructType

//Read Json Schema and Create Schema_Json
val schema_json=spark.read.json("/user/Files/ActualJson.json").schema.json

//add the schema 
val newSchema=DataType.fromJson(schema_json).asInstanceOf[StructType]

//read the json files based on schema
val df=spark.read.schema(newSchema).json("Json_Files/Folder Path")

【讨论】:

【参考方案2】:

似乎您的 json 无效。 请检查http://www.jsoneditoronline.org/

请看an-introduction-to-json-support-in-spark-sql.html

如果您想注册为表,您可以像下面这样注册并打印架构。

DataFrame df = sqlContext.read().json("/path/to/validjsonfile").toDF();
    df.registerTempTable("df");
    df.printSchema();

下面是示例代码sn-p

DataFrame app = df.select("toplevel");
        app.registerTempTable("toplevel");
        app.printSchema();
        app.show();
DataFrame appName = app.select("toplevel.sublevel");
        appName.registerTempTable("sublevel");
        appName.printSchema();
        appName.show();

scala 示例:

"name":"Michael", "cities":["palo alto", "menlo park"], "schools":["sname":"stanford", "year":2010, "sname":"berkeley", "year":2012]
"name":"Andy", "cities":["santa cruz"], "schools":["sname":"ucsb", "year":2011]
"name":"Justin", "cities":["portland"], "schools":["sname":"berkeley", "year":2014]

 val people = sqlContext.read.json("people.json")
people: org.apache.spark.sql.DataFrame

读取***字段

val names = people.select('name).collect()
names: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])

 names.map(row => row.getString(0))
res88: Array[String] = Array(Michael, Andy, Justin)

使用 select() 方法指定***字段,使用 collect() 将其收集到 Array[Row] 中,并使用 getString() 方法访问每个 Row 内的列。

展平并读取 JSON 数组

每个人都有一个“城市”数组。让我们展平这些数组并读出它们的所有元素。

val flattened = people.explode("cities", "city")c: List[String] => c
flattened: org.apache.spark.sql.DataFrame

val allCities = flattened.select('city).collect()
allCities: Array[org.apache.spark.sql.Row]

 allCities.map(row => row.getString(0))
res92: Array[String] = Array(palo alto, menlo park, santa cruz, portland)

explode() 方法将城市数组爆炸或展平成一个名为“city”的新列。然后我们使用 select() 选择新列,collect() 将其收集到 Array[Row] 中,并使用 getString() 访问每个 Row 中的数据。

读取嵌套的 JSON 对象数组,未展平

读出“学校”数据,这是一个嵌套 JSON 对象的数组。数组的每个元素都包含学校名称和年份:

 val schools = people.select('schools).collect()
schools: Array[org.apache.spark.sql.Row]


val schoolsArr = schools.map(row => row.getSeq[org.apache.spark.sql.Row](0))
schoolsArr: Array[Seq[org.apache.spark.sql.Row]]

 schoolsArr.foreach(schools => 
    schools.map(row => print(row.getString(0), row.getLong(1)))
    print("\n")
 )
(stanford,2010)(berkeley,2012) 
(ucsb,2011) 
(berkeley,2014)

使用select()collect() 选择“学校”数组并将其收集到Array[Row]。现在,每个“学校”数组的类型都是List[Row],所以我们使用getSeq[Row]() 方法读取它。最后,我们可以通过调用getString() 获取学校名称和getLong() 获取学年来阅读每所学校的信息。

【讨论】:

嗨 RamPrasad,感谢您的及时回复,我将尝试提供的示例。顺便说一句,我提供的 json 是有效的。 "billdate":"2016-08-08", "accountid":"xxx", "accountdetails": "total":"1.1", "category":[ "desc":"one", "currentinfo ":"value":"10", "subcategory":[ "categoryDe​​sc":"sub", "value":"10", "currentinfo": "value":"10" ] ] 您对如何使用预定义模式读取 json 有任何见解吗?如果是,请告诉我,谢谢并感谢您的帮助! 我在 json 中的字段比我在这里提到的要多,所以我想在读取 json 时设置我的模式并只提取那些归档和扁平化到表中。

以上是关于如何在 spark dataframes/spark sql 中使用模式读取 json的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spark 中加速 leftouterjoin?

Spark:如何在 pyspark 或 scala spark 中分解数据并添加列名?

如何在Ubuntu下搭建Spark集群

如何在单个 Spark 作业中摄取不同的 Spark 数据帧

如何在万亿级别规模的数据量上使用Spark

如何在 spark 2(java) 中创建广播变量?