将模式应用于 Java 对象的 Spark 数据集

Posted

技术标签:

【中文标题】将模式应用于 Java 对象的 Spark 数据集【英文标题】:Applying a schema to a Spark's Dataset of a java object 【发布时间】:2017-08-20 13:14:59 【问题描述】:

这里有一个类似的问题:How to add a schema to a Dataset in Spark?

但是我面临的问题是我已经预定义了Dataset<Obj1>,并且我想定义一个模式来匹配它的数据成员。最终目标是能够连接两个 java 对象。

示例代码:

Dataset<Row> rowDataset = spark.getSpark().sqlContext().createDataFrame(rowRDD, schema).toDF();
Dataset<MyObj> objResult = rowDataset.map((MapFunction<Row, MyObj>) row ->
        new MyObj(
                row.getInt(row.fieldIndex("field1")),
                row.isNullAt(row.fieldIndex("field2")) ? "" : row.getString(row.fieldIndex("field2")),
                row.isNullAt(row.fieldIndex("field3")) ? "" : row.getString(row.fieldIndex("field3")),
                row.isNullAt(row.fieldIndex("field4")) ? "" : row.getString(row.fieldIndex("field4"))
        ), Encoders.javaSerialization(MyObj.class));

如果我正在打印行数据集的架构,我会按预期获得架构:

rowDataset.printSchema();

root
 |-- field1: integer (nullable = false)
 |-- field2: string (nullable = false)
 |-- field3: string (nullable = false)
 |-- field4: string (nullable = false)

如果我正在打印对象数据集,我将丢失实际架构

objResult.printSchema();

root
 |-- value: binary (nullable = true)

问题是如何为Dataset&lt;MyObj&gt; 应用架构?

【问题讨论】:

请提供一个最小、完整和可验证的示例 - ***.com/help/mcve 关于您的问题的代码可能会帮助我们推荐一些东西。 @squid,我提供了一个代码sn-p 【参考方案1】:

下面是代码 sn-p,我试过了,spark 的行为符合预期,看来你的问题的根本原因不是 map 函数。

 SparkSession session = SparkSession.builder().config(conf).getOrCreate();
        Dataset<Row> ds = session.read().text("<some path>");
        Encoder<Employee> employeeEncode = Encoders.bean(Employee.class);
        ds.map(new MapFunction<Row, Employee>() 
            @Override
            public Employee call(Row value) throws Exception 
                return new Employee(value.getString(0).split(","));
            
        , employeeEncode).printSchema();

输出:

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

//员工豆

public class Employee 
    public String name;
    public Integer age;
    public Employee()

    
    public Employee(String [] args)
        this.name=args[0];
        this.age=Integer.parseInt(args[1]);
    

    public String getName() 
        return name;
    

    public void setName(String name) 
        this.name = name;
    

    public Integer getAge() 
        return age;
    

    public void setAge(Integer age) 
        this.age = age;
    

【讨论】:

其实我对bean Encoder并不熟悉。只有在这个问题调查期间我才知道它。这是我能得到的最接近的答案,虽然它并不完美,因为它不支持所有数据类型:spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/…

以上是关于将模式应用于 Java 对象的 Spark 数据集的主要内容,如果未能解决你的问题,请参考以下文章

学习笔记Spark—— Spark SQL应用—— Spark DataFrame基础操作

如何使用 Spark(Java)在数据集的所有列上并行应用相同的函数

将火花行对象转换为 java pojo

将 Python UDF 应用于 Spark 数据帧时出现 java.lang.IllegalArgumentException

在 Java Spark 中迭代大型数据集的最快且有效的方法

将自定义函数应用于 spark 数据框组