使用 hive/sql 和 spark 读取 json 键值
Posted
技术标签:
【中文标题】使用 hive/sql 和 spark 读取 json 键值【英文标题】:read json key-values with hive/sql and spark 【发布时间】:2015-01-13 21:11:20 【问题描述】:我正在尝试将此 json 文件读入配置单元表,***键即 1,2..,此处不一致。
"1":"\"time\":1421169633384,\"reading1\":130.875969,\"reading2\":227.138275",
"2":"\"time\":1421169646476,\"reading1\":131.240628,\"reading2\":226.810211",
"position": 0
我只需要我的蜂巢表中的时间和读数 1,2,因为列会忽略位置。 我还可以组合使用 hive 查询和 spark map-reduce 代码。 谢谢你的帮助。
更新,这是我正在尝试的
val hqlContext = new HiveContext(sc)
val rdd = sc.textFile(data_loc)
val json_rdd = hqlContext.jsonRDD(rdd)
json_rdd.registerTempTable("table123")
println(json_rdd.printSchema())
hqlContext.sql("SELECT json_val from table123 lateral view explode_map( json_map(*, 'int,string')) x as json_key, json_val ").foreach(println)
它会抛出以下错误:
Exception in thread "main" org.apache.spark.sql.hive.HiveQl$ParseException: Failed to parse: SELECT json_val from temp_hum_table lateral view explode_map( json_map(*, 'int,string')) x as json_key, json_val
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:239)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50)
at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
【问题讨论】:
您希望输出看起来像什么的示例将非常有帮助。 输出表示例:"time","reading1","reading2"\n 1421169633384 , 130.875969, 227.138275\n 1421169646476, 131.240628, 226.810211
【参考方案1】:
如果您将“1”和“2”(键名)重命名为“x1”和“x2”(在 json 文件或 rdd 中),这将起作用:
val resultrdd = sqlContext.sql("SELECT x1.time, x1.reading1, x1.reading1, x2.time, x2.reading1, x2.reading2 from table123 ")
resultrdd.flatMap(row => (Array( (row(0),row(1),row(2)), (row(3),row(4),row(5)) )))
这将为您提供一个包含时间、读数 1 和读数 2 的元组 RDD。如果您需要 SchemaRDD,您可以将其映射到 flatMap 转换中的案例类,如下所示:
case class Record(time: Long, reading1: Double, reading2: Double)
resultrdd.flatMap(row => (Array( Record(row.getLong(0),row.getDouble(1),row.getDouble(2)),
Record(row.getLong(3),row.getDouble(4),row.getDouble(5)) )))
val schrdd = sqlContext.createSchemaRDD(resultrdd)
更新:
在嵌套键多的情况下,可以这样解析行:
val allrdd = sqlContext.sql("SELECT * from table123")
allrdd.flatMap(row=>
var recs = Array[Record]();
for(col <- (0 to row.length-1))
row(col) match
case r:Row => recs = recs :+ Record(r.getLong(2),r.getDouble(0),r.getDouble(1));
case _ => ;
;
recs
)
【讨论】:
密钥一直到 1,2...240。所以做 x1.time 等可能行不通。 这是我所做的抛出错误allrdd.registerTempTable("vals") ; sqlContext.sql("select reading1 from vals LIMIT 10 ").collect.foreach(println)
我错过了什么吗?
allrdd 包含 Record 元素的列表。 json 文件中的每一行都将映射到多个 Record 元素,具体取决于您拥有多少***键(1,2 等)。 allrdd 不是 SchemaRDD,因此您不能将其注册为临时表。你没有在你的问题中说你打算如何使用这些数据,但这正是你所要求的......以上是关于使用 hive/sql 和 spark 读取 json 键值的主要内容,如果未能解决你的问题,请参考以下文章
Spark 实践 | Hive SQL 迁移 Spark SQL 在滴滴的实践
Spark(Hive) SQL中UDF的使用(Python)
concat_ws 使用在hive spark-sql上的区别