如何在Spark Java中创建复杂的StructType架构

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在Spark Java中创建复杂的StructType架构相关的知识,希望对你有一定的参考价值。

如何在Spark Java中使用StructType为以下数据定义数据类型

sam|mars|1234567|"report": {"Details": [{"subject": "science","grade": "A","remark": "good"},{"subject": "maths","grade": "E","remark": "excellent"},{"subject": "geography","grade": "E","remark": "excellent"}]}
harry|venus|987654|"report": {"Details": [{"subject": "science","grade": "O","remark": "outstanding"},{"subject": "history","grade": "A","remark": "good"}]}

这些字段是:NAME,ADDRESS,ID,REPORTCARD

我有下面的代码:

        JavaRDD<Row> row = javaRDD.map(new Function<String, Row>(){
            @Override
            public Row call(String line) throws Exception {
                return RowFactory.create((line.split("|")));
            }
        });
    where, 
    javaRDD is created on top of the above input data.

现在我需要使用下面的代码将javaRDD转换为Dataframe(Dataset df):

            Dataset<Row> df = spark.createDataFrame(row, <STRUCT TYPE SCHEMA>);

我需要为此创建一个StructType模式。如何在Spark Java中定义它。

我在StructType的以下架构中创建:

            List<StructField> reportFields = new ArrayList<StructField>();
            reportFields.add(DataTypes.createStructField("subject", DataTypes.StringType, true));
            reportFields.add(DataTypes.createStructField("grade", DataTypes.StringType, true));
            reportFields.add(DataTypes.createStructField("remark", DataTypes.StringType, true));

            List<StructField> schemaFields = new ArrayList<StructField>();
            schemaFields.add(DataTypes.createStructField("NAME", DataTypes.StringType, true));
            schemaFields.add(DataTypes.createStructField("ADDRESS", DataTypes.StringType, true));
            schemaFields.add(DataTypes.createStructField("ID", DataTypes.StringType, true));
            schemaFields.add(DataTypes.createStructField("REPORTCARD", DataTypes.createStructType(reportFields), true));
            StructType schema = DataTypes.createStructType(schemaFields);

            Dataset<Row> df = spark.createDataFrame(row, schema);

但是我得到以下例外:

java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of struct<subject:string,grade:string,remark:string>
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, NAME), StringType), true, false) AS NAME#0
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, ADDRESS), StringType), true, false) AS ADDRESS#1
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, ID), StringType), true, false) AS ID#2
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else named_struct(subject, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)), 0, subject), StringType), true, false), grade, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)), 1, grade), StringType), true, false), remark, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, REPORTCARD), StructField(subject,StringType,true), StructField(grade,StringType,true), StructField(remark,StringType,true)), 2, remark), StringType), true, false)) AS REPORTCARD#3
答案

StructType应该足够了

   StructType details = new StructType(new StructField[]{
    new StructField("subject", DataTypes.StringType, false, Metadata.empty()),
    new StructField("grade", DataTypes.StringType, false, Metadata.empty()),
    new StructField("remark", DataTypes.StringType, false, Metadata.empty())
   });

   StructType recordType = new StructType();
   recordType = recordType.add("details", details, false);

   StructType structType = new StructType();
   structType = structType.add("name", DataTypes.StringType, false);
   structType = structType.add("planet", DataTypes.StringType, false);
   structType = structType.add("number", DataTypes.StringType, false);
   structType = structType.add("record", recordType, false);

以上是关于如何在Spark Java中创建复杂的StructType架构的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Java 中创建一个接受字符串数组的 Spark UDF?

更新java spark中结构类型列中的值

如何在 UIViewControllerRepresentable Struct 中创建 UIBarButtonItem?

如何在 Java 中创建如下所示的复杂结构的 JsonObject?

如何在一个可在外面访问的struct中创建一个可变参数模板?

如何在 Spark 中创建有状态的 UDF?