使用 Spark 从 DynamoDB JSON 字符串中提取嵌套的 Json 字段?
Posted
技术标签:
【中文标题】使用 Spark 从 DynamoDB JSON 字符串中提取嵌套的 Json 字段?【英文标题】:Extract Nested Json fields from DynamoDB JSON string using Spark? 【发布时间】:2016-10-07 19:26:56 【问题描述】:我正在从 Spark 读取 dynamodb 表,该表在一个字段中有一个 JSON 字符串,在其他字段中有一个字符串。我能够读取 JSON 字段,但不能读取嵌套的 JSON 字段。这不是query Json Column using dataframes 的重复项。该问题确实解释了如何从 JSON 字符串中提取列,而不是从嵌套的 JSON 列中提取列。
import com.github.traviscrawford.spark.dynamodb._
val users = sqlContext.read.dynamodb("Dynamodb_table")
users.show(1)
样本数据集
|col1 | ID | field2|field3|
-------------------------------------------------------------------------------------
|"a":["b":"value1","x":23,"b":value2,"x":52],"c":"valC"|A1 | X1 |Y1 |
我需要从 col1(JSON 结构)和 ID 字段中提取几个字段。如here 所述,我能够弄清楚如何解析 JSON 字段(col1)并从 col1 获取字段“c”,但无法提取嵌套字段。
我的代码:
val users = sqlContext.read.dynamodb("Dynamodb_table")
val data = users.selectExpr("get_json_object(col1, '$.c')","get_json_object(col1, '$.a')","ID")
data.show(1,false)
|a |c |ID|
---------------------------------------------------------
|["b":"value1","x":23,"b":value2","x":52...]|valC|A1|
现在,当我尝试在上述数据框上应用相同的 get_json_object 时,我得到所有空值。
val nestedData = data.selectExpr("get_json_object(a, '$.b')","c","ID")
nestedData.show(false)
|get_json_object(a, '$.b')| c | ID|
------------------------------------
|null |valC|A1 |
我也尝试过爆炸,因为 col 'a' 有数组和结构。但这也不起作用,因为数据框 'data' 将 col/field 'a' 作为字符串而不是数组返回。任何想法如何解决这个问题?
更新:我还尝试使用 JSON4s 和 net.liftweb.json.parse 进行解析。这也无济于事
case class aInfo(b: String)
case class col1(a: Option[aInfo]), c: String)
import net.liftweb.json.parse
val parseJson = udf((data: String) =>
implicit val formats = net.liftweb.json.DefaultFormats
parse(data).extract[Data]
)
val parsed = users.withColumn("parsedJSON", parseJson($"data"))
parsed.show(1)
当我使用这些解析器时,所有值都显示为空。
我的预期结果:我试图从数据集中得到一个扁平化的结构
|b |x |c | ID|
--------------------
|value1|23|valC|A1 |
|value2|52|valC|A1 |
【问题讨论】:
【参考方案1】:我相信所有必需的拼图都已经在这里了,所以让我们一步一步来。您的数据相当于:
val df = Seq((
""""a":["b":"value1","b": "value2"],"c":"valC"""", "A1", "X1", "Y1"
)).toDF("col1", "ID", "field2", "field3")
Spark 提供 json4s 实现与 Lift 相同的查询 API:
import org.json4s._
import org.json4s.jackson.JsonMethods._
我们可以使用例如 LINQ 风格的 API 来定义 UDF:
val getBs = udf((s: String) => for
JString(b) <- parse(s) \ "a" \ "b"
yield b)
如果您想提取多个字段,您当然可以扩展它。例如,如果 JSON 字符串有多个字段
"a":["b":"value1","d":1,"b":"value2","d":2],"c":"valC"
你可以:
for
JObject(a) <- parse(s) \ "a"
JField("b", JString(b)) <- a
JField("d", JInt(d)) <- a
yield (b, d)
这假设两个字段都存在,否则不会匹配。要处理缺失的字段,您可能更喜欢 XPath-like 表达式或提取器:
case class A(b: Option[String], d: Option[Int])
(parse(s) \ "a").extract(Seq[A])
这样的UDF可以与explode
一起使用来提取字段:
val withBs = df.withColumn("b", explode(getBs($"col1")))
结果:
+--------------------+---+------+------+------+
| col1| ID|field2|field3| b|
+--------------------+---+------+------+------+
|"a":["b":"value...| A1| X1| Y1|value1|
|"a":["b":"value...| A1| X1| Y1|value2|
+--------------------+---+------+------+------+
您使用 Lift 的尝试不正确,因为您希望 a
是 aInfo
的序列,但仅将其定义为 Option[aInfo]
。应该是Option[Seq[aInfo]]
:
case class col1(a: Option[Seq[aInfo]], c: String)
使用像这样定义的类,解析应该可以正常工作。
如果您使用当前版本 (Spark 2.1.0),则 SPARK-17699 引入的 from_json
方法需要架构:
import org.apache.spark.sql.types._
val bSchema = StructType(Seq(StructField("b", StringType, true)))
val aSchema = StructField("a", ArrayType(bSchema), true)
val cSchema = StructField("c", StringType, true)
val schema = StructType(Seq(aSchema, cSchema))
并且可以应用为:
import org.apache.spark.sql.functions.from_json
val parsed = df.withColumn("col1", from_json($"col1", schema))
之后,您可以使用常用符号选择字段:
parsed.select($"col1.a.b")
【讨论】:
以上是关于使用 Spark 从 DynamoDB JSON 字符串中提取嵌套的 Json 字段?的主要内容,如果未能解决你的问题,请参考以下文章
将 JSON 数据从 dynamoDB 复制到 redshift
如何从命令行简化 aws DynamoDB 查询 JSON 输出?
在 Typescript 和 AWS Lambda 中将 DynamoDB 数据格式化为普通 JSON