读取 json col 和直接数据值列
Posted
技术标签:
【中文标题】读取 json col 和直接数据值列【英文标题】:reading json col and direct data value columns 【发布时间】:2020-03-11 21:37:30 【问题描述】:以下语句满足了我的需要,但它只在 spark-shell 中运行,而不在 scala 程序中运行。
spark.read.json(dataframe.select("col_name").as[String]).schema
我将数据帧转换为 rdd 并通过,它工作正常(我按照下面的链接),但它只有在
我只有json列值,当我传递其他cols(直接col值)时,它无法提供
输出。
How to parse json column in dataframe in scala
我有适用于 spark-shell 但不适用于 scala 程序的解决方案。
input table
output required
【问题讨论】:
【参考方案1】:您可以使用 from_json 方法将您的 json 列转换为 structtype 列。然后,您可以根据您的情况将此列分成不同的列。但是,您必须记住,json 应该具有统一的格式,否则结果可能不理想。 可以参考以下代码:
val df = spark.createDataFrame(Seq(
("A", "B", "\"Name\":\"xyz\",\"Address\":\"NYC\",\"title\":\"engg\""),
("C", "D", "\"Name\":\"mnp\",\"Address\":\"MIC\",\"title\":\"data\""),
("E", "F", "\"Name\":\"pqr\",\"Address\":\"MNN\",\"title\":\"bi\"")
)).toDF("col_1", "col_2", "col_json")
输入数据框如下:
scala> df.show(false)
+-----+-----+---------------------------------------------+
|col_1|col_2|col_json |
+-----+-----+---------------------------------------------+
|A |B |"Name":"xyz","Address":"NYC","title":"engg"|
|C |D |"Name":"mnp","Address":"MIC","title":"data"|
|E |F |"Name":"pqr","Address":"MNN","title":"bi" |
+-----+-----+---------------------------------------------+
现在,我们将找出 json 列的架构 col_schema
,以便将其应用于 col_json
列
val col_schema = spark.read.json(df.select(col("col_json")).as[String]).schema
val outputDF = df.withColumn("new_col", from_json(col("col_json"), col_schema)).select("col_1", "col_2", "new_col.*")
结果如下:
scala> outputDF.show(false)
+-----+-----+-------+----+-----+
|col_1|col_2|Address|Name|title|
+-----+-----+-------+----+-----+
|A |B |NYC |xyz |engg |
|C |D |MIC |mnp |data |
|E |F |MNN |pqr |bi |
+-----+-----+-------+----+-----+
对我有用的 scala 代码是:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col, from_json
import scala.collection.Seq
object Sample
def main(args: Array[String]): Unit =
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val df = spark.createDataFrame(Seq(
("A", "B", "\"Name\":\"xyz\",\"Address\":\"NYC\",\"title\":\"engg\""),
("C", "D", "\"Name\":\"mnp\",\"Address\":\"MIC\",\"title\":\"data\""),
("E", "F", "\"Name\":\"pqr\",\"Address\":\"MNN\",\"title\":\"bi\"")
)).toDF("col_1", "col_2", "col_json")
import spark.implicits._
val col_schema = spark.read.json(df.select("col_json").as[String]).schema
val outputDF = df.withColumn("new_col", from_json(col("col_json"), col_schema)).select("col_1", "col_2", "new_col.*")
outputDF.show(false)
【讨论】:
val col_schema = spark.read.json(df.select(col("col_json")).as[String]).schema ----这在 spark-shell 中工作正常(其中我已经测试过)但是当将此行写入 scala 程序时,它说它无法访问数据帧作为 spark.read.json 的参数。所以我需要将它的 json col 转换为 RDD。所以问题有没有办法或选择在 spark-scala 中编写它? 实际上,我在 spark.read.json 方法中传递了 DataSet[String] 。这仅在 spark 版本2.2.0
之后可用。此外,在 Scala 代码中创建 Spark 上下文后,您需要使用以下语句。 import spark.implicits._
。我正在上面的答案中编写scala代码,只需检查spark版本,一切都会正常。
先生,你摇滚!!!谢谢,我知道我犯的错误,我的 POM spark 版本是 2.1,所以出现错误,我将我的 POM spark 版本指向 2.3.0,这是我们当前的版本,它运行良好。再次感谢。以上是关于读取 json col 和直接数据值列的主要内容,如果未能解决你的问题,请参考以下文章