RDD和DataFrame转换(Java+Scala)
Posted 靖-Drei
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RDD和DataFrame转换(Java+Scala)相关的知识,希望对你有一定的参考价值。
一:RDD与DataFrame转换
1. 通过反射的方式来推断RDD元素中的元数据。因为RDD本身一条数据本身是没有元数据的,例如Person,而Person有name,id等,而record是不知道这些的,但是变成DataFrame背后一定知道,通过反射的方式就可以了解到背后这些元数据,进而转换成DataFrame。
如何反射?
Scala: 通过case class映射,在case class里面说我们这个RDD里面每个record的不同列的元数据是什么。
Java: 如何描述数据的元数据?构建Java Bean,使用Java Bean构建元数据信息,然后变换成DataFrame,但是此种方法不可以构建DataFrame嵌套类型。
2. 动态获取Schema,我们并不知道RDD的元数据信息,所以只能根据曾经运行时动态构建一份具体的元数据。然后将具体的元数据运行在存在的RDD上。而且这种情况比较常见。
二:代码实战
package com.dt.spark.SparkApps.sql;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
/**
* 使用反射的方式将RDD转换成为DataFrame
Person [id=1, name=Spark, age=7]
Person [id=2, name=Hadoop, age=10]
*/
public class RDDToDataFrameByReflection
public static void main(String[] args)
SparkConf conf = new SparkConf().setMaster("local").setAppName("RDDToDataFrameByReflection");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
//读取数据
JavaRDD<String> lines = sc.textFile("E://persons.txt");
JavaRDD<Person> persons = lines.map(new Function<String,Person>()
private static final long serialVersionUID = 1L;
@Override
public Person call(String line) throws Exception
String[] splited = line.split(",");
Person p = new Person();
p.setId(Integer.valueOf(splited[0].trim()));
p.setName(splited[1].trim());
p.setAge(Integer.valueOf(splited[2].trim()));
return p;
);
//第一个参数:RDD,第二个参数是JavaBean,Person类
//第二参数就是封装的JavaBean,JavaBean中封装了Person的元数据信息,
//通过第二个参数DataFrame也就获得了元数据信息。
//在底层通过反射的方式获得Person的所有Fields,结合RDD本身,就生成了DataFrame
DataFrame df = sqlContext.createDataFrame(persons, Person.class);
df.registerTempTable("persons");
DataFrame bigDatas = sqlContext.sql("select * from persons where age >= 6");
//DataFrame => RDD
JavaRDD<Row> bigDataRDD = bigDatas.javaRDD();
JavaRDD<Person> result = bigDataRDD.map(new Function<Row,Person>()
private static final long serialVersionUID = 1L;
@Override
public Person call(Row row) throws Exception
//返回具体每条记录
Person p = new Person();
/**
* 由于数据在DataFrame会进行优化,里面会对元数据进行排序
* 顺序可能就不是id name age的顺序了。
*/
p.setId(row.getInt(1));
p.setName(row.getString(2));
p.setAge(row.getInt(0));
return p;
);
List<Person> personList = result.collect();
for(Person p : personList)
System.out.println(p);
People.java源码如下:
package com.dt.spark.SparkApps.sql;
import java.io.Serializable;
//因为底层是反射,要求JavaBean是public
//此时需要序列化,因为是分布式方式。
public class Person implements Serializable
private static final long serialVersionUID = 1L;
private int id;
private String name;
private int age; public int getId()
return id;
public void setId(int id)
this.id = id;
public String getName()
return name;
public void setName(String name)
this.name = name;
public int getAge()
return age;
public void setAge(int age)
this.age = age;
@Override
public String toString()
return "Person [id=" + id + ", name=" + name + ", age=" + age + "]";
1.作业:
使用Scala在IDE中实战RDD和DataFrame转换操作
package com.dataguru.xzl.two.com.dt
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf, SparkContext
/**
* Created by xzl on 2016/3/16.
*/
object RDD2DataFrameByReflection
//case class 要放在main方法外面
case class Person(id: Int, name: String, age: Int)
def main(args: Array[String]): Unit =
val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// 导入语句,可以隐式地将RDD转化成DataFrame
import sqlContext.implicits._
val lines = sc.textFile("d://persons.txt")
val df = lines.map(_.split(",")).map splited =>
Person(splited(0).trim().toInt, splited(1), splited(2).trim().toInt)
.toDF()
df.registerTempTable("persons")
val bigDatas = sqlContext.sql("select * from persons where age >= 6")
val personList = bigDatas.javaRDD.collect()
for (p <- personList.toArray)
println(p)
以上是关于RDD和DataFrame转换(Java+Scala)的主要内容,如果未能解决你的问题,请参考以下文章
Spark中RDD转换成DataFrame的两种方式(分别用Java和Scala实现)
如何使用 toDF() 将自定义 Java 类对象的 RDD 转换为 DataFrame?