如何在Spark SQL中查询StringType的1个字段具有json值的数据框

Posted

技术标签:

【中文标题】如何在Spark SQL中查询StringType的1个字段具有json值的数据框【英文标题】:How to query on data frame where 1 field of StringType has json value in Spark SQL 【发布时间】:2016-07-17 05:07:01 【问题描述】:

我正在尝试在 spark 数据帧上使用 SQL。但是数据框有 1 个值有字符串(类似于 JSON 的结构):

我将数据框保存到临时表:TestTable

当我做 desc 时:

col_name                       data_type
requestId                       string  
name                            string  
features                        string  

但特征值是一个 json :

"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":"Movie":2,"Park Visit":11,"benefits":"freeTime":13

我只想查询 TotalSpent > 10 的 TestTable。有人能告诉我该怎么做吗?

我的 JSON 文件如下所示:

   
        "requestId": 232323,
        "name": "ravi",
        "features": ""places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":"Movie":2,"Park Visit":11,"benefits":"freeTime":13"
    

features 是一个字符串。我只需要 totalSpent 。我试过了:

val features = StructType( 
Array(StructField("totalSpent",LongType,true), 
StructField("movies",LongType,true) 
))

val schema = StructType(Array( 
StructField("requestId",StringType,true), 
StructField("name",StringType,true), 
StructField("features",features,true), 
) 
)

val records = sqlContext.read.schema(schema).json(filePath)

因为每个请求都有一个 JSON 特征字符串。但这给了我错误。

当我尝试使用

val records = sqlContext.jsonFile(filePath)

records.printSchema

给我看:

root
 |-- requestId: string (nullable = true)
 |-- features: string (nullable = true)
 |-- name: string (nullable = true)

我可以在创建模式时在 StructField 中使用并行化吗?我试过了:

I first tried with : 

val customer = StructField("features",StringType,true)
val events = sc.parallelize(customer :: Nil)


 val schema = StructType(Array( 
    StructField("requestId",StringType,true), 
    StructField("name", StructType(events, true),true), 
    StructField("features",features,true), 
    ) 
    )

这也给了我错误。也试过了:

import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => 
  implicit val formats = net.liftweb.json.DefaultFormats
  parse(s).extract[KV]
)

val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show

This gives me : 
<console>:78: error: object liftweb is not a member of package net
       import net.liftweb.json.parse

试过了:

我试过了:

 val parseJson = udf((s: String) => 
  sqlContext.read.json(s)
)

val parsed = records.withColumn("parsedJSON", parseJson($"features"))
parsed.show

但又出错了。

试过了:

import org.json4s._ 
import org.json4s.jackson.JsonMethods._ 

val parseJson = udf((s: String) =>  
parse(s) 
) 

val parsed = records.withColumn("parsedJSON", parseJson($"features")) 
parsed.show

但它给了我:

java.lang.UnsupportedOperationException: Schema for type org.json4s.JValue is not supported
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
    at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
    at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)

这给了我正确的架构(基于 zero323 给出的答案:

val extractFeatures = udf((features: String) => Try 
implicit val formats = DefaultFormats
  parse(features).extract[Features]
.toOption)

val parsed = records.withColumn("features", extractFeatures($"features"))

parsed.printSchema

但是当我查询时:

val value = parsed.filter($"requestId" === "232323" ).select($"features.totalSpent")

value.show gives null.

【问题讨论】:

@zero323 :嗨,我回答了你回答的另一个问题,但我不太明白。你能解释一下吗?我对火花很陌生。 不清楚哪一部分?这是一个很长的答案,根据要求和版本显示不同的方法。您是否考虑过任何特定的解决方案? @zero323 我尝试在我的架构上使用 parallise,因为我使用架构来获取其他值,所以我更喜欢在架构中使用它,但它会给出错误。我倾向于用UDF来解析JSON,你提到case class KV(k: String, v: Int) 是不是因为他的json的格式化方式? 因为您必须在项目中包含所需的工件。它可以是您喜欢的任何 JSON 库。 Spark 附带 json4s (+ jackson),它使用 indentiacal DSL (github.com/json4s/json4s) 作为提升,但您对此感到不舒服,选择更新和使用 get_json_object 并进行所需的类型转换。 import org.json4s._; import org.json4s.jackson.JsonMethods._ - 其余的应该或多或少相似。 【参考方案1】:

当您从 UDF 返回数据时,它必须可以表示为 SQL 类型,而 JSON AST 则不能。一种方法是创建一个与此类似的案例类:

case class Features(
  places: Integer, 
  movies: Integer,
  totalPlacesVisited: Integer, 
  totalSpent: Integer,
  SpentMap: Map[String, Integer],
  benefits: Map[String, Integer]
) 

并将其用于extract 对象:

val df = Seq((
  232323, "ravi",
  """"places":11,"movies":2,"totalPlacesVisited":0,"totalSpent":13,"SpentMap":"Movie":2,"Park Visit":11,"benefits":"freeTime":13"""
)).toDF("requestId", "name", "features")

val extractFeatures = udf((features: String) => 
  parse(features).extract[Features])

val parsed = df.withColumn("features", extractFeatures($"features"))
parsed.show(false)

// +---------+----+-----------------------------------------------------------------+
// |requestId|name|features                                                         |
// +---------+----+-----------------------------------------------------------------+
// |232323   |ravi|[11,2,0,13,Map(Movie -> 2, Park Visit -> 11),Map(freeTime -> 13)]|
// +---------+----+-----------------------------------------------------------------+

parsed.printSchema

// root
//  |-- requestId: integer (nullable = false)
//  |-- name: string (nullable = true)
//  |-- features: struct (nullable = true)
//  |    |-- places: integer (nullable = true)
//  |    |-- movies: integer (nullable = true)
//  |    |-- totalPlacesVisited: integer (nullable = true)
//  |    |-- totalSpent: integer (nullable = true)
//  |    |-- SpentMap: map (nullable = true)
//  |    |    |-- key: string
//  |    |    |-- value: integer (valueContainsNull = true)
//  |    |-- benefits: map (nullable = true)
//  |    |    |-- key: string
//  |    |    |-- value: integer (valueContainsNull = true)

根据其他记录和预期使用情况,您应该调整表示并添加相关的错误处理逻辑。

您还可以使用 DSL 以字符串形式访问单个字段:

val getMovieSpent = udf((s: String) => 
  compact(render(parse(s) \\ "SpentMap" \\ "Movie")))

df.withColumn("movie_spent", getMovieSpent($"features").cast("bigint")).show
// +---------+----+--------------------+-----------+
// |requestId|name|            features|movie_spent|
// +---------+----+--------------------+-----------+
// |   232323|ravi|"places":11,"mov...|          2|
// +---------+----+--------------------+-----------+

有关替代方法,请参阅How to query JSON data column using Spark DataFrames?

【讨论】:

是的。我的坏:)谢谢你的详细解释。 它工作正常。但我尝试 parsed.show 它给了我:java.lang.NullPointerExceptio 就像我之前所说的,您需要适当的异常处理,并且可能需要根据数据的规律性进行一些调整。对于初学者,您可以使用scala.util.Try(...).toOption,但这是一种非常粗糙的方法。 哦..好的..感谢您的建议..我收到 org.json4s.package$MappingException: Parsed JSON values do not match with class constructor 嗯,在 1.6+ 中要容易得多。你可以试试 DSL,它可能对用户更友好一些。我用一个小例子更新了答案。

以上是关于如何在Spark SQL中查询StringType的1个字段具有json值的数据框的主要内容,如果未能解决你的问题,请参考以下文章

如何创建类型化的空 MapType?

hivesql 迁移spark3.0 sparksql报错如Cannot safely cast '字段':StringType to IntegerType的问题

spark sql - 如何在 spark sql 中编写动态查询

Spark - 将包含 JSON 字符串的列从 StringType 转换为 Array Type(StringType())

如何在 spark-sql 查询中引用地图列?

如何在 Spark SQL 中查询 Avro 表