Spark_总结四

Posted 日月的弯刀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark_总结四相关的知识,希望对你有一定的参考价值。

转载请标明出处http://www.cnblogs.com/haozhengfei/p/22bba3b1ef90cbfaf073eb44349c0757.html 


Spark_总结四

1.Spark SQL  

    Spark SQL 和 Hive on Spark 两者的区别?
        spark on hive:hive只是作为元数据存储的角色,解析,优化,执行都是spark做的    
        hive on spark: hive既作为存储的角色,又作为计算角色的一部分,hive将sql解析Spark任务,底层是Spark引擎hive2.0以后推荐使用Spark引擎,转化为Spark任务,hvie2.0以前都是转化为MR任务)
            
    Spark SQL 转化的过程(底层架构)
 
【SQL/HQL-->解析器-->分析器-->优化器-->CostModel消耗模型(选出消耗最低的,就是效率最高的),最终将传入的SQL转换为RDD的计算】
 
须知:
        若想使用SparkSQL必须创建SQLContext 必须是传入SparkContext 不能是SparkConf
 
1.DataFrame与RDD的区别?   ||   什么是DataFrame?

区别:
      Spark core是基于RDD的编程,Spark SQL是基于DataFrame的编程,DataFrame的底层就是封装的RDD,只不过DataFrame底层RDD的泛型是ROW(DataFrame <==> RDD<ROW>),另外,DataFrame中有对列的描述,但是RDD没有对列的描述。      
What is DataFrame:
      DataFrame RDD 类似,DataFrame 是一个分布式数据容器,更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息(比如对列的描述), 即 schema。同时,与 Hive 类似,DataFrame 也支持嵌套数据类型(struct、 array 和 map)。 从 API 易用性的角度上 看,DataFrameAPI 提供的是一套高层的关系操作函数式的 RDDAPI 更加友好,门槛更低。 

3.创建DataFrame的来源和方   ||   如何对DataFrame中封装的数据进行操作?

3.1创建DataFrame的来源和方
 

3.2如何对DataFrame中封装的数据进行操作?

   当我们的DataFrame构建好之后,里面封装了我们的数据,需要对数据进行操作即对DataFrame进行操作,有两种方式
3.2.1   通过方法
        sqlContext.read()    返回DataFrameReader对象
        sqlContext.read().json("student.json")   读取一个json文件(这个json文件中的内容不能是嵌套的)读进来变成DataFrame,
        df.select("age").show(),如果没有show,这个程序就不会执行,这个show就类似与Spark中Action类型的算子,触发执行
 
示例代码:
 1 package com.hzf.spark.exercise;
 2 
 3 import org.apache.spark.SparkConf;
 4 import org.apache.spark.api.java.JavaSparkContext;
 5 import org.apache.spark.sql.DataFrame;
 6 import org.apache.spark.sql.SQLContext;
 7 
 8 public class TestSparkSQL {
 9     public static void main(String[] args) {
10         SparkConf conf = new SparkConf().setAppName("DataFrameOps").setMaster("local");
11         
12         JavaSparkContext sc = new JavaSparkContext(conf);
13         SQLContext sqlContext = new SQLContext(sc);
14         
15         DataFrame df = sqlContext.read().json("people.json");
16         
17         
18         /*
19          * 操作DataFrame的第一种方式
20          * */
21         //类似 SQL的select from table;
22         df.show();
23         //desc table
24         df.printSchema();
25         
26         //select age from table;
27         df.select("age").show();
28         //select name from table;
29         df.select("name").show();
30         //select name,age+10 from table;
31         df.select(df.col("name"),df.col("age").plus(10)).show();
32         //select * from table where age > 20
33         df.filter(df.col("age").gt(20)).show();
34     }
35 }
View Code
 
result:
 
3.2.2   通过注册临时表,传入SQL语句(推荐使用)
 
示例代码:
 1 package com.hzf.spark.exercise;
 2 
 3 import org.apache.spark.SparkConf;
 4 import org.apache.spark.api.java.JavaSparkContext;
 5 import org.apache.spark.sql.DataFrame;
 6 import org.apache.spark.sql.SQLContext;
 7 
 8 public class TestSparkSQL01 {
 9     public static void main(String[] args) {
10         SparkConf conf = new SparkConf().setAppName("DataFrameOps").setMaster("local");
11         
12         JavaSparkContext sc = new JavaSparkContext(conf);
13         SQLContext sqlContext = new SQLContext(sc);
14         
15         DataFrame df = sqlContext.read().json("people.json");
16         
17         //将DataFrame中封装的数据注册为一张临时表,对临时表进行sql操作
18         df.registerTempTable("people");
19         DataFrame sql = sqlContext.sql("SELECT * FROM people WHERE age IS NOT NULL");
20         sql.show(); 
21     }
22 }
View Code
 
result:
 
3.3创建DataFrame的几种方式,来源(json,jsonRDD,parquet,非json格式,mysql)
 
<1>读取Json格式文件-->DataFrame:Json 文件中不能有嵌套的格式
      加载json格式文件-->DataFrame有两种方式
           方式一:DataFrame df = sqlContext.read().format("json").load("people.json");
           方式二:DataFrame df = sqlContext.read().json("people.json");
 
数据集:
 
示例代码:
 1 package com.bjsxt.java.spark.sql.json;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 import org.apache.spark.SparkConf;
 7 import org.apache.spark.SparkContext;
 8 import org.apache.spark.api.java.JavaPairRDD;
 9 import org.apache.spark.api.java.JavaRDD;
10 import org.apache.spark.api.java.JavaSparkContext;
11 import org.apache.spark.api.java.function.Function;
12 import org.apache.spark.api.java.function.PairFunction;
13 import org.apache.spark.sql.DataFrame;
14 import org.apache.spark.sql.Row;
15 import org.apache.spark.sql.RowFactory;
16 import org.apache.spark.sql.SQLContext;
17 import org.apache.spark.sql.types.DataTypes;
18 import org.apache.spark.sql.types.StructField;
19 import org.apache.spark.sql.types.StructType;
20 
21 import scala.Tuple2;
22 
23 /**
24  * JSON数据源
25  * @author Administrator
26  *
27  */
28 public class JSONDataSource {
29 
30     public static void main(String[] args) {
31         SparkConf conf = new SparkConf()
32                 .setAppName("JSONDataSource")
33 //                .set("spark.default.parallelism", "100")
34                 .setMaster("local");  
35         JavaSparkContext sc = new JavaSparkContext(conf);
36         SQLContext sqlContext = new SQLContext(sc);
37         
38         DataFrame studentScoresDF = sqlContext.read().json("student.json");  
39         
40         studentScoresDF.registerTempTable("student_scores");
41         DataFrame goodStudentScoresDF = sqlContext.sql(
42                 "select name,count(score) from student_scores where score>=80 group by name");
43         
44         List<String> goodStudentNames = goodStudentScoresDF.javaRDD().map(new Function<Row, String>() {
45                     private static final long serialVersionUID = 1L;
46                     
47                     @Override
48                     public String call(Row row) throws Exception {
49                         return row.getString(0);
50                     }
51                     
52         }).collect();
53         
54         for(String str: goodStudentNames){
55             System.out.println(str);
56         }
57     }
58 }
View Code
 
result:
 
<2>jsonRDD-->DataFrame
 
<3>读取Parquet格式文件-->DataFrame自动推测分区,合并 Schema。
经验:将Spark中的文本转换为Parquet以提升性能

    parquet是一个基于列的存储格式,列式存储布局可以加速查询,因为它只检查所有需要的列并对它们的值执行计算,因此只读取一个数据文件或表的小部分数据。Parquet 还支持灵活的压缩选项,因此可以显著减少磁盘上的存储

   如果在 HDFS 上拥有基于文本的数据文件或表,而且正在使用 Spark SQL 对它们执行查询,那么强烈推荐将文本数据文件转换为 Parquet 数据文件,以实现性能和存储收益。当然,转换需要时间,但查询性能的提升在某些情况下可能达到 30 倍或更高,存储的节省可高达 75%!

parquet的压缩比高,将一个普通的文本转化为parquet格式,如何去转?
       val lineRDD = sc.textFile()
       DF.save(parquet) //将RDD转化为DF
parquet操作示例
   是否指定format--若存储时,指定format为json格式,那么则生成json格式文件,否则不指定format,默认文件以parquet形式进行存储 
测试一:指定format为json格式,存储在本地
测试数据:   top.txt
测试代码

测试结果
 
 
测试二:不指定format,那么文件默认以parquet形式进行存储,存储在本地
 
测试数据:   people.json
测试代码
测试结果
 
测试三:读取本地parquet存储格式的文件
测试代码

测试结果
 
测试四:读取hdfs上parquet形式的文件
测试代码
测试结果
 
<4> RDD(非json格式变成DataFrame)
读取txt 文件-->DataFrame从 txt 文件读取,然后转为 RDD,最后转为 DataFrame
                                       RDD 转为 DataFrame 有两种方式
                        (1)反射机制
                              注意点:自定义的类一定要是 public,并且要实现序列化接口 Serializable
                                             取数据的时候,在 JavaAPI 中会有顺序问题(因为 DataFrame 转为 RDD<Row> 的时候,会进行一次字典排序改变 Row 的位置,而Scala 的 API 则没有这个问题)
                        (2)动态创建 Schema,先将 RDD 中的每一行类型变 为 RDD<Row> 类型,然后创建 DataFrame 的元数据-->构建 StructType,用于最后 DataFrame 元数据的描述,基于现有的 StructType 以及 RDD<Row> 来构造 DataFrame。(如果列的信息比较长可以存到数据库里) 
<4.1>反射机制
数据
示例代码:
自定义类
 1 package com.bjsxt.java.spark.sql.createdf;
 2 
 3 import java.util.List;
 4 
 5 import org.apache.spark.SparkConf;
 6 import org.apache.spark.api.java.JavaRDD;
 7 import org.apache.spark.api.java.JavaSparkContext;
 8 import org.apache.spark.api.java.function.Function;
 9 import org.apache.spark.sql.DataFrame;
10 import org.apache.spark.sql.Row;
11 import org.apache.spark.sql.SQLContext;
12 
13 /**
14  * 使用反射的方式将RDD转换成为DataFrame
15  * 1、自定义的类必须是public
16  * 2、自定义的类必须是可序列化的
17  * 3、RDD转成DataFrame的时候,他会根据自定义类中的字段名进行排序。
18  * @author zfg
19  *
20  */
21 
22 public class RDD2DataFrameByReflection {
23 
24     public static void main(String[] args) {
25         SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");
26         JavaSparkContext sc = new JavaSparkContext(conf);
27         SQLContext sqlcontext = new SQLContext(sc);
28         
29         JavaRDD<String> lines = sc.textFile("Peoples.txt");
30         JavaRDD<Person> personsRdd = lines.map(new Function<String, Person>() {
31             
32             private static final long serialVersionUID = 1L;
33 
34             @Override
35             public Person call(String line) throws Exception {
36                 String[] split = line.split(",");
37                 Person p = new Person();
38                 p.setId(Integer.valueOf(split[0].trim()));
39                 p.setName(split[1]);
40                 p.setAge(Integer.valueOf(split[2].trim()));
41                 return p;
42             }
43         });
44         
45         //传入进去Person.class的时候,sqlContext是通过反射的方式创建DataFrame
46         //在底层通过反射的方式或得Person的所有field,结合RDD本身,就生成了DataFrame
47         DataFrame df = sqlcontext.createDataFrame(personsRdd, Person.class);
48      
49         //命名table的名字为person
50         df.registerTempTable("personTable");
51         
52         DataFrame resultDataFrame = sqlcontext.sql("select * from personTable where age > 7");
53         resultDataFrame.show();
54         
55          //将df转成rdd
56         JavaRDD<Row> resultRDD = resultDataFrame.javaRDD();
57         JavaRDD<Person> result = resultRDD.map(new Function<Row, Person>() {
58             
59             private static final long serialVersionUID = 1L;
60 
61             @Override
62             public Person call(Row row) throws Exception {
63                  Person p = new Person();
64                  p.setAge(row.getInt(0));
65                  p.setId(row.getInt(1));
66                  p.setName(row.getString(2));
67                 return p;
68             }
69         });
70         
71          List<Person> personList = result.collect();
72          
73          for (Person person : personList) {
74             System.out.println(person.toString());
75         } 
76     }
77 }
View Code
 
 
result:

<4.2>动态创建Schema方式
数据
 
示例代码:
 1 package com.bjsxt.java.spark.sql.createdf;
 2 
 3 import java.util.ArrayList;
 4 import java.util.List;
 5 
 6 import org.apache.spark.SparkConf;
 7 import org.apache.spark.api.java.JavaRDD;
 8 import org.apache.spark.api.java.JavaSparkContext;
 9 import org.apache.spark.api.java.function.Function;
10 import org.apache.spark.sql.DataFrame;
11 import org.apache.spark.sql.Row;
12 import org.apache.spark.sql.RowFactory;
13 import org.apache.spark.sql.SQLContext;
14 import org.apache.spark.sql.types.DataTypes;
15 import org.apache.spark.sql.types.StructField;
16 import org.apache.spark.sql.types.StructType;
17 
18 
19 public class RDD2DataFrameByProgrammatically {
20     public static void main(String[] args) {
21         SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameByReflection");
22         JavaSparkContext sc = new JavaSparkContext(conf);
23         SQLContext sqlcontext = new SQLContext(sc);
24         /**
25          * 在RDD的基础上创建类型为Row的RDD
26          */
27         JavaRDD<String> lines = sc.textFile("Peoples.txt");
28         JavaRDD<Row> rowRDD = lines.map(new Function<String, Row>() {
29             
30             private static final long serialVersionUID = 1L;
31 
32             @Override
33             public Row call(String line) throws Exception {
34                 String[] split = line.split(",");
35                 return RowFactory.create(Integer.valueOf(split[0]),split[1],Integer.valueOf(split[2]));
36             }
37         });
38         
39         /**
40          * 动态构造DataFrame的元数据,一般而言,有多少列以及每列的具体类型可能来自于Json,也可能来自于DB
41          */
42         ArrayList<StructField> structFields = new ArrayList<StructField>();
43         structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
44         structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
45         structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
46         //构建StructType,用于最后DataFrame元数据的描述
47         StructType schema =以上是关于Spark_总结四的主要内容,如果未能解决你的问题,请参考以下文章

验证码逆向专栏某验四代文字点选验证码逆向分析

验证码逆向专栏某验四代消消乐验证码逆向分析

5.9号今日总结

Spark_总结四

Spark_总结五

Spark_总结五