在 JAVA 中使用 Spark 2.1.1 读取嵌套 Json(Spark 2.2 有解决方案,但我正在研究 spark 2.1.1 版本)

Posted

技术标签:

【中文标题】在 JAVA 中使用 Spark 2.1.1 读取嵌套 Json(Spark 2.2 有解决方案,但我正在研究 spark 2.1.1 版本)【英文标题】:Reading Nested Json with Spark 2.1.1 In JAVA ( Spark 2.2 has solution, but i am working on spark 2.1.1 version) 【发布时间】:2019-03-05 08:08:46 【问题描述】:

我想使用下面提到的数据在 spark-SQL 中创建一个表。

[
  "empstr": "Blogspan",
  "empbyte": 48,
  "empshort": 457,
  "empint": 935535,
  "emplong": 36156987676070,
  "empfloat": 6985.98,
  "empdoub": 6392455.0,
  "empdec": 0.447,
  "empbool": 0,
  "empdate": "09/29/2018",
  "emptime": "2018-03-24 12:56:26"
, 
  "empstr": "Lazzy",
  "empbyte": 9,
  "empshort": 460,
  "empint": 997408,
  "emplong": 37564196351623,
  "empfloat": 7464.75,
  "empdoub": 5805694.86,
  "empdec": 0.303,
  "empbool": 1,
  "empdate": "08/14/2018",
  "emptime": "2018-06-17 18:31:15"
]

但是,当我尝试查看打印模式时,它显示 corruped_redord。 所以,你能帮我吗,如何在 JAVA-spark 2.1.1 中读取嵌套的 JSON 记录 下面我将附上我的代码

case "readjson":

    tempTable = hiveContext.read().json(hiveContext.sparkContext().wholeTextFiles("1.json", 0));
    /*In above line i am getting error at .json says 
    The method json(String...) in the type DataFrameReader is not applicable for the arguments (RDD<Tuple2<String,String>>)


    //tempTable = hiveContext.read().json(componentBean.getHdfsPath());

tempTable.printSchema();
        tempTable.show();
        tempTable.createOrReplaceTempView(componentKey);
        break;

【问题讨论】:

multiline 选项已在 2.2 中引入 @user10465355 是的,spark 2.2 有多行选项,因为我正在研究 2.1.1 spark,所以我需要在 spark 2.1.1 版本本身中找到解决方案。我已经尝试了所有的可能性,但我找不到上述问题的解决方案 链接中提供的解决方案解决了2.2前后的pb @eliasah 它可能适用于 scala,但不适用于 java,下面我附上我的屏幕截图,请看一下。 当我使用 sparkContext.wholeTextfiles 时,它会问我(路径,最小分区),如果我同时传递两个参数,那么 .json 会出现错误,说 json 支持 JSON(字符串),但传递RDD>。所以它建议使用 Seq 进行强制转换。如果我使用 seq 进行投射,火花作业将不会运行。如果你能帮助我,那么请帮助我。 【参考方案1】:

您似乎对使用 API 的哪些部分有疑问。

您需要记住 SparkContext != JavaSparkContext

这意味着您需要从活动的SparkSession创建一个JavaSparkContext 对象:

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

// [...]

SparkSession session = SparkSession.builder().getOrCreate();
SQLContext hiveContext = session.sqlContext();
JavaSparkContext sc = JavaSparkContext.fromSparkContext(session.sparkContext());
JavaRDD<String> jsonRDD = sc.wholeTextFiles("path/to/data", 2).values();
Dataset<Row> jsonDataset = hiveContext.read().json(jsonRDD);

jsonDataset.show();

// +-------+-------+----------+------+----------+--------+------+--------------+--------+--------+-------------------+
// |empbool|empbyte|   empdate|empdec|   empdoub|empfloat|empint|       emplong|empshort|  empstr|            emptime|
// +-------+-------+----------+------+----------+--------+------+--------------+--------+--------+-------------------+
// |      0|     48|09/29/2018| 0.447| 6392455.0| 6985.98|935535|36156987676070|     457|Blogspan|2018-03-24 12:56:26|
// |      1|      9|08/14/2018| 0.303|5805694.86| 7464.75|997408|37564196351623|     460|   Lazzy|2018-06-17 18:31:15|
// +-------+-------+----------+------+----------+--------+------+--------------+--------+--------+-------------------+

我希望这会有所帮助。

【讨论】:

你能帮忙并建议如何处理这个***.com/questions/62036791/…

以上是关于在 JAVA 中使用 Spark 2.1.1 读取嵌套 Json(Spark 2.2 有解决方案,但我正在研究 spark 2.1.1 版本)的主要内容,如果未能解决你的问题,请参考以下文章

在 spark java 中读取具有固定宽度和分隔符的文本文件

如何从 Java Spark 应用程序中读取 app.properties 文件

Spark steaming 从 Kafka 读取并在 Java 中应用 Spark SQL 聚合

在 java spark 中从 REST API 读取 csv

java.lang.IllegalArgumentException:实例化'org.apache.spark.sql.hive.HiveSessionState'时出错:使用spark会话读取csv

在 Spark SQL 中读取 40 万行时出现内存不足错误 [重复]