如何使用 Spark DataFrames 查询 JSON 数据列?

Posted

技术标签:

【中文标题】如何使用 Spark DataFrames 查询 JSON 数据列?【英文标题】:How to query JSON data column using Spark DataFrames? 【发布时间】:2016-03-08 06:15:46 【问题描述】:

我有一个 Cassandra 表,为了简单起见,它看起来像:

key: text
jsonData: text
blobData: blob

我可以为此使用 spark 和 spark-cassandra-connector 创建一个基本数据框:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

我正在努力将 JSON 数据扩展到其底层结构。我最终希望能够根据 json 字符串中的属性进行过滤并返回 blob 数据。类似 jsonData.foo = "bar" 并返回 blobData。目前可以吗?

【问题讨论】:

key 是唯一标识符吗? 是的,key是表的主键 【参考方案1】:

火花 >= 2.4

如果需要,可以使用schema_of_json 函数确定架构(请注意,这假定任意行是架构的有效代表)。

import org.apache.spark.sql.functions.lit, schema_of_json, from_json
import collection.JavaConverters._

val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))

火花 >= 2.1

你可以使用from_json函数:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("k", StringType, true), StructField("v", DoubleType, true)
))

df.withColumn("jsonData", from_json($"jsonData", schema))

火花 >= 1.6

您可以使用get_json_object,它采用列和路径:

import org.apache.spark.sql.functions.get_json_object

val exprs = Seq("k", "v").map(
  c => get_json_object($"jsonData", s"$$.$c").alias(c))

df.select($"*" +: exprs: _*)

并将字段提取为可以进一步转换为预期类型的​​单个字符串。

path 参数使用点语法表示,前导 $. 表示文档根(因为上面的代码使用字符串插值,$ 必须被转义,因此 $$.)。

火花:

目前可以吗?

据我所知,这不是直接可能的。您可以尝试类似的方法:

val df = sc.parallelize(Seq(
  ("1", """"k": "foo", "v": 1.0""", "some_other_field_1"),
  ("2", """"k": "bar", "v": 3.0""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")

我假设 blob 字段不能用 JSON 表示。否则你 cab 省略拆分和加入:

import org.apache.spark.sql.Row

val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map
  case Row(key: String, json: String) =>
    s""""key": "$key", "jsonData": $json"""
) 

val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema

// root
//  |-- jsonData: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: double (nullable = true)
//  |-- key: long (nullable = true)
//  |-- blobData: string (nullable = true)

另一种方法(更便宜,但更复杂)是使用 UDF 解析 JSON 并输出 structmap 列。例如这样的:

import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => 
  implicit val formats = net.liftweb.json.DefaultFormats
  parse(s).extract[KV]
)

val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show

// +---+--------------------+------------------+----------+
// |key|            jsonData|          blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// |  1|"k": "foo", "v":...|some_other_field_1|   [foo,1]|
// |  2|"k": "bar", "v":...|some_other_field_2|   [bar,3]|
// +---+--------------------+------------------+----------+

parsed.printSchema

// root
//  |-- key: string (nullable = true)
//  |-- jsonData: string (nullable = true)
//  |-- blobData: string (nullable = true)
//  |-- parsedJSON: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: integer (nullable = false)

【讨论】:

to_json 提供StructType 不能在spark 2.2.0 中工作@ ` List fields = new ArrayList(2); fields.add(DataTypes.createStructField("a", DataTypes.StringType, false)); StructType 架构 = DataTypes.createStructType(fields); g4.withColumn("dd", functions.to_json(functions.col("a"),schema) ).show(); ` 如果您正在寻找一种更全面的方法来解决 JSON 数据中的架构可变性,您可能应该使用spark.read.json()。我在my answer 中有详细信息和完整示例。【参考方案2】:

zero323's answer 是彻底的,但错过了 Spark 2.1+ 中可用的一种方法,它比使用 schema_of_json() 更简单、更健壮:

import org.apache.spark.sql.functions.from_json

val json_schema = spark.read.json(df.select("jsonData").as[String]).schema
df.withColumn("jsonData", from_json($"jsonData", json_schema))

这是 Python 的等价物:

from pyspark.sql.functions import from_json

json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
df.withColumn("jsonData", from_json("jsonData", json_schema))

正如 zero323 所指出的,schema_of_json() 的问题在于它检查单个字符串并从中派生出一个模式。如果您有不同模式的 JSON 数据,那么您从 schema_of_json() 返回的模式将不会反映如果您要合并 DataFrame 中所有 JSON 数据的模式会得到什么。使用from_json() 解析该数据将产生大量nullschema_of_json() 返回的架构与数据不匹配的空值。

通过使用 Spark 从 JSON 字符串的 RDD 派生出综合 JSON 模式的能力,我们可以保证所有 JSON 数据都可以被解析。

示例:schema_of_json()spark.read.json()

这是一个示例(在 Python 中,代码与 Scala 非常相似)来说明使用 schema_of_json() 从单个元素派生架构与使用 spark.read.json() 从所有数据派生架构之间的区别。

>>> df = spark.createDataFrame(
...     [
...         (1, '"a": true'),
...         (2, '"a": "hello"'),
...         (3, '"b": 22'),
...     ],
...     schema=['id', 'jsonData'],
... )

a 在一行中有一个布尔值,在另一行中有一个字符串值。 a 的合并模式会将其类型设置为字符串。 b 将是一个整数。

让我们看看不同方法的比较。一、schema_of_json() 方法:

>>> json_schema = schema_of_json(df.select("jsonData").take(1)[0][0])
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- jsonData: struct (nullable = true)
 |    |-- a: boolean (nullable = true)

>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
|  1|  [true]|
|  2|    null|
|  3|      []|
+---+--------+

如您所见,我们派生的 JSON 模式非常有限。 "a": "hello" 无法解析为布尔值并返回 null,而 "b": 22 刚刚被删除,因为它不在我们的架构中。

现在spark.read.json():

>>> json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- jsonData: struct (nullable = true)
 |    |-- a: string (nullable = true)
 |    |-- b: long (nullable = true)

>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
|  1| [true,]|
|  2|[hello,]|
|  3|  [, 22]|
+---+--------+

在这里,我们保留了所有数据,并具有一个涵盖所有数据的综合架构。 "a": true 被转换为字符串以匹配 "a": "hello" 的架构。

使用spark.read.json() 的主要缺点是Spark 将扫描您的所有数据以导出架构。根据您拥有的数据量,该开销可能很大。如果您知道所有 JSON 数据都具有一致的架构,那么可以继续使用 schema_of_json() 来处理单个元素。如果您有架构可变性但不想扫描所有数据,您可以在调用spark.read.json() 时将samplingRatio 设置为小于1.0 以查看数据子集。

这里是spark.read.json() 的文档:Scala API / Python API

【讨论】:

非常感谢!这是一个非常优雅的解决方案,应该是这个问题的公认解决方案。 这是一个如此全面的解决方案!很有帮助!【参考方案3】:

from_json 函数正是您正在寻找的。您的代码将类似于:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

//You can define whatever struct type that your json states
val schema = StructType(Seq(
  StructField("key", StringType, true), 
  StructField("value", DoubleType, true)
))

df.withColumn("jsonData", from_json(col("jsonData"), schema))

【讨论】:

【参考方案4】:

底层 JSON 字符串是

" \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"";

以下是过滤 JSON 并将所需数据加载到 Cassandra 的脚本。

  sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2")
            .write.format("org.apache.spark.sql.cassandra")
            .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name"))
            .mode(SaveMode.Append)
            .save()

【讨论】:

【参考方案5】:

我用下面的

(自 2.2.0 起可用,我假设您的 json 字符串列位于列索引 0)

def parse(df: DataFrame, spark: SparkSession): DataFrame = 
    val stringDf = df.map((value: Row) => value.getString(0), Encoders.STRING)
    spark.read.json(stringDf)

它会自动推断 JSON 中的模式。记录在这里: https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/DataFrameReader.html

【讨论】:

以上是关于如何使用 Spark DataFrames 查询 JSON 数据列?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 spark dataframes/spark sql 中使用模式读取 json

spark知识体系04-SQL,DataFrames,DateSets

我们如何在 Spark 中使用 Dataframes(由 structtype 方法创建)合并具有不同列数的 2 个表?

Spark SQL and DataFrame Guide(1.4.1)——之DataFrames

如何使用 Apache Spark Dataframes (Python) 执行 Switch 语句

大数据(spark sql 和 spark dataframes 连接)