将模式应用于 Java 对象的 Spark 数据集
Posted
技术标签:
【中文标题】将模式应用于 Java 对象的 Spark 数据集【英文标题】:Applying a schema to a Spark's Dataset of a java object 【发布时间】:2017-08-20 13:14:59 【问题描述】:这里有一个类似的问题:How to add a schema to a Dataset in Spark?
但是我面临的问题是我已经预定义了Dataset<Obj1>
,并且我想定义一个模式来匹配它的数据成员。最终目标是能够连接两个 java 对象。
示例代码:
Dataset<Row> rowDataset = spark.getSpark().sqlContext().createDataFrame(rowRDD, schema).toDF();
Dataset<MyObj> objResult = rowDataset.map((MapFunction<Row, MyObj>) row ->
new MyObj(
row.getInt(row.fieldIndex("field1")),
row.isNullAt(row.fieldIndex("field2")) ? "" : row.getString(row.fieldIndex("field2")),
row.isNullAt(row.fieldIndex("field3")) ? "" : row.getString(row.fieldIndex("field3")),
row.isNullAt(row.fieldIndex("field4")) ? "" : row.getString(row.fieldIndex("field4"))
), Encoders.javaSerialization(MyObj.class));
如果我正在打印行数据集的架构,我会按预期获得架构:
rowDataset.printSchema();
root
|-- field1: integer (nullable = false)
|-- field2: string (nullable = false)
|-- field3: string (nullable = false)
|-- field4: string (nullable = false)
如果我正在打印对象数据集,我将丢失实际架构
objResult.printSchema();
root
|-- value: binary (nullable = true)
问题是如何为Dataset<MyObj>
应用架构?
【问题讨论】:
请提供一个最小、完整和可验证的示例 - ***.com/help/mcve 关于您的问题的代码可能会帮助我们推荐一些东西。 @squid,我提供了一个代码sn-p 【参考方案1】:下面是代码 sn-p,我试过了,spark 的行为符合预期,看来你的问题的根本原因不是 map 函数。
SparkSession session = SparkSession.builder().config(conf).getOrCreate();
Dataset<Row> ds = session.read().text("<some path>");
Encoder<Employee> employeeEncode = Encoders.bean(Employee.class);
ds.map(new MapFunction<Row, Employee>()
@Override
public Employee call(Row value) throws Exception
return new Employee(value.getString(0).split(","));
, employeeEncode).printSchema();
输出:
root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
//员工豆
public class Employee
public String name;
public Integer age;
public Employee()
public Employee(String [] args)
this.name=args[0];
this.age=Integer.parseInt(args[1]);
public String getName()
return name;
public void setName(String name)
this.name = name;
public Integer getAge()
return age;
public void setAge(Integer age)
this.age = age;
【讨论】:
其实我对bean Encoder并不熟悉。只有在这个问题调查期间我才知道它。这是我能得到的最接近的答案,虽然它并不完美,因为它不支持所有数据类型:spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/…以上是关于将模式应用于 Java 对象的 Spark 数据集的主要内容,如果未能解决你的问题,请参考以下文章
学习笔记Spark—— Spark SQL应用—— Spark DataFrame基础操作
如何使用 Spark(Java)在数据集的所有列上并行应用相同的函数
将 Python UDF 应用于 Spark 数据帧时出现 java.lang.IllegalArgumentException