读取 json col 和直接数据值列

Posted

技术标签:

【中文标题】读取 json col 和直接数据值列【英文标题】:reading json col and direct data value columns 【发布时间】:2020-03-11 21:37:30 【问题描述】:

以下语句满足了我的需要,但它只在 spark-shell 中运行,而不在 scala 程序中运行。

spark.read.json(dataframe.select("col_name").as[String]).schema

我将数据帧转换为 rdd 并通过,它工作正常(我按照下面的链接),但它只有在

我只有json列值,当我传递其他cols(直接col值)时,它无法提供

输出。

How to parse json column in dataframe in scala

我有适用于 spark-shell 但不适用于 scala 程序的解决方案。

input table

output required

【问题讨论】:

【参考方案1】:

您可以使用 from_json 方法将您的 json 列转换为 structtype 列。然后,您可以根据您的情况将此列分成不同的列。但是,您必须记住,json 应该具有统一的格式,否则结果可能不理想。 可以参考以下代码:

val df = spark.createDataFrame(Seq(
  ("A", "B", "\"Name\":\"xyz\",\"Address\":\"NYC\",\"title\":\"engg\""),
  ("C", "D", "\"Name\":\"mnp\",\"Address\":\"MIC\",\"title\":\"data\""),
  ("E", "F", "\"Name\":\"pqr\",\"Address\":\"MNN\",\"title\":\"bi\"")
)).toDF("col_1", "col_2", "col_json")

输入数据框如下:

scala> df.show(false)
+-----+-----+---------------------------------------------+
|col_1|col_2|col_json                                     |
+-----+-----+---------------------------------------------+
|A    |B    |"Name":"xyz","Address":"NYC","title":"engg"|
|C    |D    |"Name":"mnp","Address":"MIC","title":"data"|
|E    |F    |"Name":"pqr","Address":"MNN","title":"bi"  |
+-----+-----+---------------------------------------------+

现在,我们将找出 json 列的架构 col_schema,以便将其应用于 col_json

val col_schema = spark.read.json(df.select(col("col_json")).as[String]).schema

val outputDF = df.withColumn("new_col", from_json(col("col_json"), col_schema)).select("col_1", "col_2", "new_col.*")

结果如下:

scala> outputDF.show(false)
+-----+-----+-------+----+-----+
|col_1|col_2|Address|Name|title|
+-----+-----+-------+----+-----+
|A    |B    |NYC    |xyz |engg |
|C    |D    |MIC    |mnp |data |
|E    |F    |MNN    |pqr |bi   |
+-----+-----+-------+----+-----+

对我有用的 scala 代码是:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col, from_json
import scala.collection.Seq

object Sample

  def main(args: Array[String]): Unit = 

    val spark = SparkSession.builder().master("local[*]").getOrCreate()

    val df = spark.createDataFrame(Seq(
      ("A", "B", "\"Name\":\"xyz\",\"Address\":\"NYC\",\"title\":\"engg\""),
      ("C", "D", "\"Name\":\"mnp\",\"Address\":\"MIC\",\"title\":\"data\""),
      ("E", "F", "\"Name\":\"pqr\",\"Address\":\"MNN\",\"title\":\"bi\"")
    )).toDF("col_1", "col_2", "col_json")

    import spark.implicits._

    val col_schema = spark.read.json(df.select("col_json").as[String]).schema

    val outputDF = df.withColumn("new_col", from_json(col("col_json"), col_schema)).select("col_1", "col_2", "new_col.*")

    outputDF.show(false)
  

【讨论】:

val col_schema = spark.read.json(df.select(col("col_json")).as[String]).schema ----这在 spark-shell 中工作正常(其中我已经测试过)但是当将此行写入 scala 程序时,它说它无法访问数据帧作为 spark.read.json 的参数。所以我需要将它的 json col 转换为 RDD。所以问题有没有办法或选择在 spark-scala 中编写它? 实际上,我在 spark.read.json 方法中传递了 DataSet[String] 。这仅在 spark 版本2.2.0 之后可用。此外,在 Scala 代码中创建 Spark 上下文后,您需要使用以下语句。 import spark.implicits._。我正在上面的答案中编写scala代码,只需检查spark版本,一切都会正常。 先生,你摇滚!!!谢谢,我知道我犯的错误,我的 POM spark 版本是 2.1,所以出现错误,我将我的 POM spark 版本指向 2.3.0,这是我们当前的版本,它运行良好。再次感谢。

以上是关于读取 json col 和直接数据值列的主要内容,如果未能解决你的问题,请参考以下文章

从访问中读取多值列到c#

OPENJSON 将值列转换为多行不起作用

在火花中删除空值列

php 将值列附加到json / array中

数据值列类型和数据字段属性

查询优化:将元数据连接到值列表表