将包含 Json 的 Dataset<String> 转换为 Dataset<StructType>

Posted

技术标签:

【中文标题】将包含 Json 的 Dataset<String> 转换为 Dataset<StructType>【英文标题】:Convert Dataset<String> containing Json into Dataset<StructType> 【发布时间】:2019-05-23 13:03:40 【问题描述】:

Spark 擅长在从磁盘初始读取时将 JSON 解析为嵌套的 StructType,但是如果我已经在 Dataset 中有一个包含 JSON 的 String 列,并且我想将它映射到 @ 987654329@ 具有 StructType 列,具有将整个数据集考虑在内的架构推断,同时充分利用并行状态并避免减少操作?

我知道函数 schema_of_jsonfrom_json 显然是为了一起使用来完成此任务或类似的,但我很难找到实际工作的代码示例,尤其是在 Java 中。

我将接受任何提供 Java 示例并满足完整模式推断和完整非简化并行操作目标的答案。或者,如果这不可能,最接近的解决方法。

我目前使用的是 Spark 2.4.0。

我研究了以下相关问题:

Implicit schema discovery on a JSON-formatted Spark DataFrame column

这个问题与我的类似,但对于 Scala。没有公认的答案。 OP 在评论中宣布他们找到了一个“hacky”解决方案来让from_schema 工作。除了“hackiness”之外,该解决方案的问题在于它仅从数据帧的第一行推断模式,因此类型可能受到过于严格的限制:

val jsonSchema: String = df.select(schema_of_json(df.select(col("custom")).first.getString(0))).as[String].first

编辑:我尝试了here 所示的解决方案,如下面的 cmets 所述。这是实现:

    SparkSession spark = SparkSession
            .builder()
            .appName("example")
            .master("local[*]")
            .getOrCreate();

    Dataset<Row> df = spark.read().text(conf.getSourcePath());

    df.cache();

    String schema = df.select(schema_of_json(col("value")))
          .as(Encoders.STRING())
          .first();
    df.withColumn("parsedJson", from_json(col("value"), schema, new HashMap<String, String>()))
            .drop("value")
            .write()
            .mode("append")
            .parquet(conf.getDestinationPath());

从这段代码中我得到了一个错误:

AnalysisException: cannot resolve 'schemaofjson(`value`)' due to data type mismatch: The input json should be a string literal and not null; however, got `value`.;;
'Project [schemaofjson(value#0) AS schemaOfjson(value)#20]
+- Relation[value#0] text

这个错误导致我收到以下 Spark 拉取请求: https://github.com/apache/spark/pull/22775

这似乎表明schema_of_json 从未打算应用于整个表以便对整个事物进行模式推断,而是从使用@987654338 直接传入的单个文字 JSON 样本推断模式@。在这种情况下,我不知道 Spark 提供了任何解决方案来从整个表上的 JSON 进行完整模式推断。除非这里有人可以更正我对此拉取请求的阅读或提供替代方法??

【问题讨论】:

对不起,有人可以就否决票告诉我吗?谢谢。 你在 Scala 中见过 Working Examples 吗?这些几乎与 Java 对应物相同。从避免减少动作开始 - 模式推断几乎按照定义是“减少动作”。 @user10958683 感谢您的链接。我看过这两个中的第一个,但没有过多关注它,因为它与我在上面提到的那个链接,我首先找到的。它看起来更有希望,因为它似乎正确使用了schema_of_json。我毫不费力地将其转换为 Java。会试试的。 @user10958683 是的,模式推断本质上显然是简化的。我正在考虑 spark.read.json 的耦合性质,因为它允许在幕后更有效地逐步推断模式。不确定如何在集群上实现。显然,司机必须从所有工人那里收集结果...... @user10958683 上面的第一个示例导致我现在在问题中记录的错误。第二个不适用,因为它使用带有文字 JSON 字符串而不是列名的 schema_of_json,因此不能应用于整个表。 【参考方案1】:

使用DataFrameReader.json(Dataset&lt;String&gt;)实际上有一个非常简单的解决方案,不知道为什么它没有出现在我的搜索中:

    Dataset<String> ds = ...;

    spark.read()
        .json(ds)
        .write()
        .mode("append")
        .parquet(conf.getDestinationPath());

如果源数据集中有多个列,显然您可以只选择一个进行操作。并且内容类型必须是String(例如不是Row)。

【讨论】:

以上是关于将包含 Json 的 Dataset<String> 转换为 Dataset<StructType>的主要内容,如果未能解决你的问题,请参考以下文章

json转dataset的另外一种解析方式自动生成guid强关联

识别包含单词的句子,并使用 str.contains 在列中显示该单词

.NET DataTable DataSet转json代码

Datatable/Dataset 转 JSON方法

datatable和dataset转json

将json数据分配给Ng2图表中的DataSet