如何在 spark dataframes/spark sql 中使用模式读取 json
Posted
技术标签:
【中文标题】如何在 spark dataframes/spark sql 中使用模式读取 json【英文标题】:how to read json with schema in spark dataframes/spark sql 【发布时间】:2016-09-06 17:58:49 【问题描述】:sql/数据帧, 请帮助我或提供一些关于如何阅读此 json 的好建议
"billdate":"2016-08-08',
"accountid":"xxx"
"accountdetails":
"total":"1.1"
"category":[
"desc":"one",
"currentinfo":
"value":"10"
,
"subcategory":[
"categoryDesc":"sub",
"value":"10",
"currentinfo":
"value":"10"
]
]
谢谢,
【问题讨论】:
【参考方案1】:您可以尝试以下代码在 Spark 2.2 中读取基于 Schema 的 JSON 文件
import org.apache.spark.sql.types.DataType, StructType
//Read Json Schema and Create Schema_Json
val schema_json=spark.read.json("/user/Files/ActualJson.json").schema.json
//add the schema
val newSchema=DataType.fromJson(schema_json).asInstanceOf[StructType]
//read the json files based on schema
val df=spark.read.schema(newSchema).json("Json_Files/Folder Path")
【讨论】:
【参考方案2】:似乎您的 json 无效。 请检查http://www.jsoneditoronline.org/
请看an-introduction-to-json-support-in-spark-sql.html
如果您想注册为表,您可以像下面这样注册并打印架构。
DataFrame df = sqlContext.read().json("/path/to/validjsonfile").toDF();
df.registerTempTable("df");
df.printSchema();
下面是示例代码sn-p
DataFrame app = df.select("toplevel");
app.registerTempTable("toplevel");
app.printSchema();
app.show();
DataFrame appName = app.select("toplevel.sublevel");
appName.registerTempTable("sublevel");
appName.printSchema();
appName.show();
scala 示例:
"name":"Michael", "cities":["palo alto", "menlo park"], "schools":["sname":"stanford", "year":2010, "sname":"berkeley", "year":2012]
"name":"Andy", "cities":["santa cruz"], "schools":["sname":"ucsb", "year":2011]
"name":"Justin", "cities":["portland"], "schools":["sname":"berkeley", "year":2014]
val people = sqlContext.read.json("people.json")
people: org.apache.spark.sql.DataFrame
读取***字段
val names = people.select('name).collect()
names: Array[org.apache.spark.sql.Row] = Array([Michael], [Andy], [Justin])
names.map(row => row.getString(0))
res88: Array[String] = Array(Michael, Andy, Justin)
使用 select() 方法指定***字段,使用 collect() 将其收集到 Array[Row] 中,并使用 getString() 方法访问每个 Row 内的列。
展平并读取 JSON 数组
每个人都有一个“城市”数组。让我们展平这些数组并读出它们的所有元素。
val flattened = people.explode("cities", "city")c: List[String] => c
flattened: org.apache.spark.sql.DataFrame
val allCities = flattened.select('city).collect()
allCities: Array[org.apache.spark.sql.Row]
allCities.map(row => row.getString(0))
res92: Array[String] = Array(palo alto, menlo park, santa cruz, portland)
explode() 方法将城市数组爆炸或展平成一个名为“city”的新列。然后我们使用 select() 选择新列,collect() 将其收集到 Array[Row] 中,并使用 getString() 访问每个 Row 中的数据。
读取嵌套的 JSON 对象数组,未展平
读出“学校”数据,这是一个嵌套 JSON 对象的数组。数组的每个元素都包含学校名称和年份:
val schools = people.select('schools).collect()
schools: Array[org.apache.spark.sql.Row]
val schoolsArr = schools.map(row => row.getSeq[org.apache.spark.sql.Row](0))
schoolsArr: Array[Seq[org.apache.spark.sql.Row]]
schoolsArr.foreach(schools =>
schools.map(row => print(row.getString(0), row.getLong(1)))
print("\n")
)
(stanford,2010)(berkeley,2012)
(ucsb,2011)
(berkeley,2014)
使用select()
和collect()
选择“学校”数组并将其收集到Array[Row]
。现在,每个“学校”数组的类型都是List[Row]
,所以我们使用getSeq[Row]()
方法读取它。最后,我们可以通过调用getString()
获取学校名称和getLong()
获取学年来阅读每所学校的信息。
【讨论】:
嗨 RamPrasad,感谢您的及时回复,我将尝试提供的示例。顺便说一句,我提供的 json 是有效的。 "billdate":"2016-08-08", "accountid":"xxx", "accountdetails": "total":"1.1", "category":[ "desc":"one", "currentinfo ":"value":"10", "subcategory":[ "categoryDesc":"sub", "value":"10", "currentinfo": "value":"10" ] ] 您对如何使用预定义模式读取 json 有任何见解吗?如果是,请告诉我,谢谢并感谢您的帮助! 我在 json 中的字段比我在这里提到的要多,所以我想在读取 json 时设置我的模式并只提取那些归档和扁平化到表中。以上是关于如何在 spark dataframes/spark sql 中使用模式读取 json的主要内容,如果未能解决你的问题,请参考以下文章
Spark:如何在 pyspark 或 scala spark 中分解数据并添加列名?