将 Spark DataFrame 转换为 Pojo 对象
Posted
技术标签:
【中文标题】将 Spark DataFrame 转换为 Pojo 对象【英文标题】:Convert Spark DataFrame to Pojo Object 【发布时间】:2015-12-10 05:00:58 【问题描述】:请看下面的代码:
//Create Spark Context
SparkConf sparkConf = new SparkConf().setAppName("TestWithObjects").setMaster("local");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
//Creating RDD
JavaRDD<Person> personsRDD = javaSparkContext.parallelize(persons);
//Creating SQL context
SQLContext sQLContext = new SQLContext(javaSparkContext);
DataFrame personDataFrame = sQLContext.createDataFrame(personsRDD, Person.class);
personDataFrame.show();
personDataFrame.printSchema();
personDataFrame.select("name").show();
personDataFrame.registerTempTable("peoples");
DataFrame result = sQLContext.sql("SELECT * FROM peoples WHERE name='test'");
result.show();
在此之后,我需要将 DataFrame - 'result' 转换为 Person 对象或列表。提前致谢。
【问题讨论】:
【参考方案1】:DataFrame 只是 Dataset[Row] 的类型别名。与强类型化 Scala/Java 数据集附带的“类型化转换”相比,这些操作也称为“非类型化转换”。
在 spark 中从 Dataset[Row] 到 Dataset[Person] 的转换非常简单
DataFrame result = sQLContext.sql("SELECT * FROM peoples WHERE name='test'");
此时,Spark 会将您的数据转换为 DataFrame = Dataset[Row],这是一个通用 Row 对象的集合,因为它不知道确切的类型。
// Create an Encoders for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> personDF = result.as(personEncoder);
personDF.show();
现在,Spark 转换 Dataset[Row] -> Dataset[Person] 类型特定的 Scala / Java JVM 对象,由 Person 类指定。
详情请参考databricks提供的以下链接
https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html
【讨论】:
似乎编码器试图设置所有类参数,而不仅仅是那些在构造函数中的参数。例如,如果我在上述情况下有一个像class A int p1 int p2 public A(int p1) this.p1 = p1 this.p2 = p1 * 2
int 这样的类,编码器会要求 p1 和 p2 在奇数的数据帧中都可用。【参考方案2】:
DataFrame
存储为Row
s,因此您可以使用the methods there to cast from untyped to typed。看看get
方法。
【讨论】:
方法给定,我们只能按值获取每个值,不能作为一个整体对象 @DonMathew 在这一点上你不会得到更好的。如果更容易,您可以转换为 JSON 并从那里开始。即将推出的DataSet
API 应该或多或少地为您提供您想要的,尽管我不确定 POJO 现在是否不需要解码器。以上是关于将 Spark DataFrame 转换为 Pojo 对象的主要内容,如果未能解决你的问题,请参考以下文章
将包含 BigInt 的 RDD 转换为 Spark Dataframe
将Spark Dataframe转换为Scala Map集合