将 Json 的 Dataset 列解析为 Dataset<Row>

Posted

技术标签:

【中文标题】将 Json 的 Dataset 列解析为 Dataset<Row>【英文标题】:parse Dataset column of Json to Dataset<Row> 【发布时间】:2016-11-22 09:32:52 【问题描述】:

拥有Dataset&lt;Row&gt;的单列json字符串:

+--------------------+
|               value|
+--------------------+
|"Context":"00AA0...|
+--------------------+

Json 示例:

"Context":"00AA00AA","MessageType":"1010","Module":"1200"

我怎样才能最有效地获得如下所示的Dataset&lt;Row&gt;

+--------+-----------+------+
| Context|MessageType|Module|
+--------+-----------+------+
|00AA00AA|       1010|  1200|
+--------+-----------+------+

我正在处理流中的这些数据,我知道当我从文件中读取数据时,spark 可以自己执行此操作:

spark
.readStream()
.schema(MyPojo.getSchema())
.json("src/myinput")

但现在我正在从 kafka 读取数据,它以另一种形式为我提供数据。 我知道我可以使用 Gson 之类的解析器,但我想让 spark 为我做这件事。

【问题讨论】:

【参考方案1】:

试试这个示例。

public class SparkJSONValueDataset 
    public static void main(String[] args) 
        SparkSession spark = SparkSession
                .builder()
                .appName("SparkJSONValueDataset")
                .config("spark.sql.warehouse.dir", "/file:C:/temp")
                .master("local")
                .getOrCreate();

        //Prepare data Dataset<Row>
        List<String> data = Arrays.asList("\"Context\":\"00AA00AA\",\"MessageType\":\"1010\",\"Module\":\"1200\"");
        Dataset<Row> df = spark.createDataset(data, Encoders.STRING()).toDF().withColumnRenamed("_1", "value");
        df.show();

        //convert to Dataset<String> and Read
        Dataset<String> df1 = df.as(Encoders.STRING());
        Dataset<Row> df2 = spark.read().json(df1.javaRDD());
        df2.show();
        spark.stop();
    
 

【讨论】:

为您解答。猜猜这会起作用,但我真的不喜欢将我的 DF 再次发送给读者的想法:(

以上是关于将 Json 的 Dataset 列解析为 Dataset<Row>的主要内容,如果未能解决你的问题,请参考以下文章

json转dataset的另外一种解析方式自动生成guid强关联

将 pandas df 转换为 json 然后解析日期

将Dataset中的列类型转换为python中具有特定格式的日期时间类型时出错

使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet

解析列中具有动态键的 JSON 值并将 JSON 转换为 BigQuery 中的记录列结构

如何从 Spark 2.0 中的 DataFrame 列创建数据集?