spark的累加器-SQL-Streaming

Posted 飞机耳朵

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark的累加器-SQL-Streaming相关的知识,希望对你有一定的参考价值。

RDD持久化
---------------
    memory disk off-heap serial replication
    Memory_ONLY(true , false ,false , true  ,1)


广播变量
---------------
    driver端切成小块,存放到blockmanager,executor广播变量
    的小块,首先从自己的blockmgr中提取,如果提取不到,在从其他
    节点(driver + executor)提取,一旦提取到存放在自己的blockmgr。
    RDD + dep 附加在task中。
    scala的lazy延迟计算机制。


累加器
----------------
    只能累加,只能在driver读取value,executor不能读取。
    不能在map中调用。累计器的update只应该在action中执行,

自定义累加器,实现气温双聚合
-----------------------------
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**
  * 测试累加器
  */
object AccTestScala {

    //自定义累加器
    class MyAcc extends AccumulatorV2[Int,(Int,Int)]{
        //最高气温
        var max: Int = Int.MinValue
        //最低气温
        var min: Int = Int.MaxValue

        //判断是否是初始值
        override def isZero: Boolean = {
            max == Int.MinValue && min == Int.MaxValue
        }

        override def copy(): AccumulatorV2[Int, (Int, Int)] = {
            val copy = new MyAcc()
            copy.max = max
            copy.min = min
            copy
        }

        override def reset(): Unit = {
            max= Int.MinValue
            //最低气温
             min = Int.MaxValue
        }

        override def add(v: Int): Unit = {
            max = math.max(max, v)
            min = math.min(min, v)
        }

        override def merge(other: AccumulatorV2[Int, (Int, Int)]): Unit = {
            max = math.max(max, other.value._1)
            min = math.min(min, other.value._2)
        }

        override def value: (Int, Int) = {
            (max,min)
        }
    }

    def main(args: Array[String]): Unit = {
        //1.创建spark配置对象
        val conf = new SparkConf()
        conf.setAppName("wcApp")
        conf.setMaster("local[4]")

        val sc = new SparkContext(conf)

        val acc = new MyAcc()
        sc.register(acc , "myacc")
        val rdd1 = sc.textFile("file:///d:/mr/temp.dat")
        val rdd2 = rdd1.map(line=>{
            val arr = line.split(" ")
            arr(1).toInt
        })

        rdd2.foreach(temp=>{
            acc.add(temp)
        })

        println(acc.value)
    }
}

Spark模块
------------------
    1.core                
        RDD
        job

    sparkSQL
    spark Streaming
    spark ml
    spark graphx


Spark SQL模块
------------------
    0.介绍
        DataFrame,数据框相当于表  DataFrame。
        引入spark-sql依赖。
        DataFrame是特殊的DataSet,DataSet[Row].


    0‘.spark操纵hive出错问题
        1)不能实例化客户端
            a)原因
                版本不一致问题。
            b)降级hive到hive1.2
            c)复制hive jar到spark的jars下
            d)或者关闭hive-site.xml schema版本检查
                <property>
                    <name>hive.metastore.schema.verification</name>
                    <value>false</value>
                </property>
                
    
    1.集成hive
        0.说明
            spark sql操纵hive,使用spark作为执行引擎。只是从hdfs上读取hive的数据,放到spark上执行。

        1.整合步骤
            复制hive的hive-site.xml文件到spark conf目录下。
            复制mysql的驱动到spark/jars/2.启动zk,hdfs。
            
        3.启动spark集群
            $>spark/sbin/start-all.sh
        
        4.进入spark-shell
            $>spark-shell --master spark://s101:7077
            $scala>spark.sql("select * from mydb.custs").show()


    2.编程实现spark sql的hive访问
        2.1)scala版
            a)复制hive-site.xml + core-site.xml + hdfs-site.xml到resources目录下
                
            b)添加maven支持
                <?xml version="1.0" encoding="UTF-8"?>
                <project xmlns="http://maven.apache.org/POM/4.0.0"
                         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
                         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
                    <modelVersion>4.0.0</modelVersion>

                    <groupId>com.it18zhang</groupId>
                    <artifactId>my-spark</artifactId>
                    <version>1.0-SNAPSHOT</version>

                    <dependencies>
                        <dependency>
                            <groupId>org.apache.spark</groupId>
                            <artifactId>spark-core_2.11</artifactId>
                            <version>2.1.0</version>
                        </dependency>
                        <dependency>
                            <groupId>org.apache.spark</groupId>
                            <artifactId>spark-sql_2.11</artifactId>
                            <version>2.1.0</version>
                        </dependency>
                        <dependency>
                            <groupId>com.alibaba</groupId>
                            <artifactId>fastjson</artifactId>
                            <version>1.2.24</version>
                        </dependency>
                        <dependency>
                            <groupId>mysql</groupId>
                            <artifactId>mysql-connector-java</artifactId>
                            <version>5.1.17</version>
                        </dependency>
                        <!--*************************************************-->
                        <!--****** 注意:一定要引入该依赖,否则hive不好使****-->
                        <!--*************************************************-->
                        <dependency>
                            <groupId>org.apache.spark</groupId>
                            <artifactId>spark-hive_2.11</artifactId>
                            <version>2.1.0</version>
                        </dependency>
                    </dependencies>
                </project>

            d)编程
                val conf = new SparkConf()
                conf.setAppName("SparkSQLScala")
                conf.setMaster("local")
                conf.set("spark.sql.warehouse.dir", "hdfs://mycluster/user/hive/warehouse")

                //启用hive支持
                val sess = SparkSession.builder()
                                    .config(conf)
                                    .enableHiveSupport()    //一定要启用hive支持。
                                    .getOrCreate()
                import sess._
                sess.sql("select * from mydb.custs").show()

        2.2)java版
            import org.apache.spark.SparkConf;
            import org.apache.spark.sql.Dataset;
            import org.apache.spark.sql.Row;
            import org.apache.spark.sql.SparkSession;

            /**
             *
             */
            public class MySparkSQLJava {
                public static void main(String[] args) {
                    SparkConf conf = new SparkConf();
                    conf.setAppName("MySparkSQLJava");
                    conf.setMaster("local[*]") ;
                    SparkSession sess = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate();
                    Dataset<Row> df = sess.sql("select * from mydb.custs") ;
                    df.show();
                }
            }


注册RDD成为DataFrame
----------------------
    import org.apache.spark.sql.SparkSession

    /**
      */
    object SparkSQLRDDScala {
        def main(args: Array[String]): Unit = {
            //创建spark Session
            val sess = SparkSession.builder().appName("sparksql").master("local").enableHiveSupport().getOrCreate()

            import sess.implicits._
            val rdd1 = sess.sparkContext.textFile("file:///d:/mr/temp.dat")
            val rdd2 = rdd1.map(line=>{
                val arr = line.split(" ")
                (arr(0).toInt, arr(1).toInt)
            })

            val df1 = rdd2.toDF("year","temp")
            df1.createOrReplaceTempView("temps")
            val sql = "select year , max(temp) max ,min(temp) min from temps group by year order by year asc limit 200"
            sess.sql(sql).show(1000,false)
        }
    }


Java版RDD和DataFrame之间转换
-----------------------------
    package com.oldboy.spark.java;

    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.Metadata;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import scala.Tuple2;
    import scala.tools.nsc.typechecker.StructuredTypeStrings$class;

    /**
     * SparkSQL java操作
     */
    public class SparkSQRDDLava {


        public static void main(String[] args) {
            SparkSession sess = SparkSession.builder().appName("sparksql").master("local").enableHiveSupport().getOrCreate();
            JavaSparkContext sc = new JavaSparkContext(sess.sparkContext()) ;
            JavaRDD<String> rdd1 = sc.textFile("file:///d:/mr/temp.dat");

            //将RDD<Row>类型转换成Dataset<Row>
    //        JavaRDD<Row> rdd2 = rdd1.map(new Function<String, Row>() {
    //            public Row call(String v1) throws Exception {
    //                String[] arr = v1.split(" ");
    //                return RowFactory.create(Integer.parseInt(arr[0]) , Integer.parseInt(arr[1]));
    //            }
    //        }) ;
    //
    //        //创建结构体
    //        StructField[] fields = new StructField[2];
    //        fields[0] = new StructField("year", DataTypes.IntegerType, false, Metadata.empty());
    //        fields[1] = new StructField("temp", DataTypes.IntegerType, true, Metadata.empty());
    //        StructType type = new StructType(fields);
    //
    //        Dataset<Row> df1 = sess.createDataFrame(rdd2 , type) ;
    //        df1.createOrReplaceTempView("temps");
    //        sess.sql("select * from temps").show();


            JavaRDD<TempData> rdd2 = rdd1.map(new Function<String, TempData>() {
                public TempData call(String v1) throws Exception {
                    String[] arr = v1.split(" ");
                    TempData data = new TempData() ;
                    data.setYear(Integer.parseInt(arr[0]));
                    data.setTemp(Integer.parseInt(arr[1]));
                    return data;
                }
            }) ;

            Dataset<Row> df1 = sess.createDataFrame(rdd2 , TempData.class) ;
            df1.show();

            System.out.println("==================================");
            //数据框转成RDD
            Dataset<Row> df2 = sess.sql("select * from big10.emp2");
            JavaRDD<Row> rdd3 = df2.toJavaRDD();
            JavaPairRDD<Integer, Float> rdd4 = rdd3.mapToPair(new PairFunction<Row, Integer, Float>() {
                public Tuple2<Integer, Float> call(Row row) throws Exception {
                    int depno = row.getInt(row.fieldIndex("deptno")) ;
                    float sal= row.getFloat(row.fieldIndex("salary")) ;
                    return new Tuple2<Integer,Float>(depno , sal) ;
                }
            }) ;

            JavaPairRDD<Integer, Float> rdd5 = rdd4.reduceByKey(new Function2<Float, Float, Float>() {
                public Float call(Float v1, Float v2) throws Exception {
                    return Math.max(v1, v2);
                }
            }) ;

            rdd5.foreach(new VoidFunction<Tuple2<Integer, Float>>() {
                public void call(Tuple2<Integer, Float> t) throws Exception {
                    System.out.println(t);
                }
            }); ;
        }
    }



spark打印数据结构
---------------------
    df.printSchema()

spark sql访问json文件
----------------------------
    1.创建json文件
        [d:/java/custs.json]
        {"id":1,"name":"tom","age":12}
        {"id":2,"name":"tomas","age":13}
        {"id":3,"name":"tomasLee","age":14}
        {"id":4,"name":"tomson","age":15}
        {"id":5,"name":"tom2","age":16}
    
    2.加载文件[scala]
        val conf = new SparkConf()
        conf.setAppName("SparkSQLScala")
        conf.setMaster("local")
        conf.set("spark.sql.warehouse.dir", "hdfs://mycluster/user/hive/warehouse")

        //启用hive支持
        val sess = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
        val df = sess.read.json("file:///d:/java/custs.json")
        df.show(1000,false)

    3.[java]版
        Dataset<Row> df = spark.read().json("file:///d:/java/custs.json");

    4.保存成json
        df1.writer.json(path) ;


    
spark sql访问parquet文件
----------------------------
    //保存成parquet
    df1.writer.parquet(path) ;

    //读取
    spark.read.parquet(path) ;


spark sql访问jdbc文件
----------------------------
    //保存成parquet
    val prop = new java.util.Properties()
    prop.put("driver" , "com.mysql.jdbc.Driver")
    prop.put("user" , "root")
    prop.put("password" , "root")
    //表不需要存在
    df1.writer.jdbc("jdbc:mysql://192.168.231.1:3306/big10" , "emp" ,prop ) ;

    //读取
    spark.read.jdbc("jdbc:mysql://192.168.231.1:3306/big10" , "emp" ,prop ) ;

Spark sql DataFrame API编程
----------------------------
    DataFrame.select("id" , "name")
    DataFrame.select($"id" , $"name")
    DataFrame.where(" id > 3")
    DataFrame.groupBy("id").agg(max("age"),min("age")) ;
    ...

spark临时视图
----------------------------
    1.createOrReplaceTempView
        生命周期仅限本session

    2.createGlobalTempView
        全局,跨session.
    
Spark SQL作为分布式查询引擎
----------------------------
    1.描述
        终端用户/应用程序可以直接同spark sql交互,而不需要写其他代码。
    2.启动spark的thrift-server进程
        spark/sbin/start-thrift-server --master spark://s101:7077 
    3.检测
        a)webui
        b)端口
            netstat -anop|grep 10000

    4.使用spark的beeline程序测试
        $>spark/bin/beeline
        $beeline>!conn jdbc:hive2://s101:10000/mydb
        $beeline>select * from customers ;



Spark Streaming
----------------------------
    持续计算,没有停止。
    不是实时计算,小批量计算。

体验流计算
-----------------
    1.编写流计算代码
        import org.apache.spark.SparkConf
        import org.apache.spark.streaming.{Seconds, StreamingContext}

        /**
          * Created by Administrator on 2018/5/18.
          */
        object SparkStreamingDemo1 {
            def main(args: Array[String]): Unit = {
                val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
                val ssc = new StreamingContext(conf, Seconds(1))

                //创建套接字文本流
                val lines = ssc.socketTextStream("localhost", 9999)

                //压扁
                val words = lines.flatMap(_.split(" "))

                val ds2 = words.map((_,1))

                val ds3 = ds2.reduceByKey(_+_)

                ds3.print()

                //启动上下文
                ssc.start()

                ssc.awaitTermination()
            }
        }

    2.启动nc服务器
        [win7]
        nc -l -L -p 9999
    3.启动scala程序
        /resources/log4j.properties
            #
            # Licensed to the Apache Software Foundation (ASF) under one or more
            # contributor license agreements.  See the NOTICE file distributed with
            # this work for additional information regarding copyright ownership.
            # The ASF licenses this file to You under the Apache License, Version 2.0
            # (the "License"); you may not use this file except in compliance with
            # the License.  You may obtain a copy of the License at
            #
            #    http://www.apache.org/licenses/LICENSE-2.0
            #
            # Unless required by applicable law or agreed to in writing, software
            # distributed under the License is distributed on an "AS IS" BASIS,
            # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
            # See the License for the specific language governing permissions and
            # limitations under the License.
            #

            # Set everything to be logged to the console
            log4j.rootCategory=warn, console
            log4j.appender.console=org.apache.log4j.ConsoleAppender
            log4j.appender.console.target=System.err
            log4j.appender.console.layout=org.apache.log4j.PatternLayout
            log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

            # Set the default spark-shell log level to WARN. When running the spark-shell, the
            # log level for this class is used to overwrite the root logger‘s log level, so that
            # the user can have different defaults for the shell and regular Spark apps.
            log4j.logger.org.apache.spark.repl.Main=WARN

            # Settings to quiet third party logs that are too verbose
            log4j.logger.org.spark_project.jetty=WARN
            log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
            log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
            log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
            log4j.logger.org.apache.parquet=ERROR
            log4j.logger.parquet=ERROR

            # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
            log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
            log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

    4.在nc的终端输入
        hello world

spark streaming java版
----------------------------
    package com.oldboy.spark.java;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.Statement;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;

    /**
     * SparkStreaming java 版
     */
    public class SparkStreamingJava {
        public static void main(String[] args) throws Exception {
            SparkConf conf = new SparkConf() ;
            conf.setAppName("ssc") ;
            conf.setMaster("local[2]") ;

            //创建SparkStreaming上下文
            JavaStreamingContext ssc = new JavaStreamingContext(conf , Durations.seconds(2)) ;

            //创建离散流
            JavaDStream<String> ds1 = ssc.socketTextStream("s101" , 8888);

            //压扁
            JavaDStream<String> ds2 = ds1.flatMap(new FlatMapFunction<String, String>() {
                public Iterator<String> call(String s) throws Exception {
                    String[] arr = s.split(" ");
                    return Arrays.asList(arr).iterator();
                }
            }) ;

            //标1成对
            JavaPairDStream<String, Integer> ds3 = ds2.mapToPair(new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s, 1);
                }
            }) ;

            //聚合
            JavaPairDStream<String, Integer> ds4 = ds3.reduceByKey(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            }) ;

    //        ds4.print();

            ds4.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
                public void call(JavaPairRDD<String, Integer> rdd) throws Exception {
                    System.out.println("--------------------");
                    List list = rdd.take(100);
                    for(Object o : list){
                        System.out.println(o);
                    }
                }
            });

            ssc.start();

            ssc.awaitTermination();
        }
    }

SparkStreaming注意事项
-------------------------
    1.上下文启动后,不能再添加新的计算工作
    2.上下文停止后,不能重新启动。
    3.在一个JVM中,只有一个上下文时活跃的。
    4.停止Streaming上下文,可以有选择性的停止SparkContext.
    5.SparkContext可以重用创建多个Streaming,前提是上一个需要stop掉。

    DStream离散流内部是连续的RDD,DStream的操作转换成对RDD的操作。

离散流和接受者
-------------------------
    socket文本流都和Receiver关联,接受者从source接受数据,存放到spark内存用于计算。
    源类型
    1.基本源
        内置
    2.高级源
        第三方支持

    注意事项:
        本地执行流计算,不可以local == local[1] ,只有一个线程执行本地任务,需要有一个单独的
        线程运行接受者,没有线程执行计算工作。

    
    Socket sock = new Socket("localhost" , 8888) ;


Spark Streaming API
----------------------
    1.StreamingContext
        spark流计算入口点,创建离散流,可以使用多种方式创建。
        start()/stop()/awaitTermination();


    2.SocketReceiver
        创建Socket对象,孵化分线程,接受数据,存储到内存中。
        
    3.ReceiverTracker
        接受者跟踪器,管理接收器的执行。

    4.JobSchduler
        调度job在spark上执行,使用JobGenerator生成器生成job,并使用线程池执行他们。


使用spark sql实现taggen(scala版)
---------------------------------
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.types.{DataTypes, StringType, StructField, StructType}
    import org.apache.spark.sql.{Row, SparkSession}

    /**
      */
    object MySparkSQLScalaTaggen {
        def main(args: Array[String]): Unit = {
            val conf = new SparkConf()
            conf.setAppName("SparkSQLScala")
            conf.setMaster("local")
            conf.set("spark.sql.warehouse.dir", "hdfs://mycluster/user/hive/warehouse")

            //启用hive支持
            val sess = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()

            //1.加载文件形成rdd
            val rdd1 = sess.sparkContext.textFile("file:///d:/temptags.txt")
            //2.变换过滤,去除无效行
            val rdd2 = rdd1.map(_.split("\t")).filter(_.length > 1)
            //3.json解析
            val rdd3 = rdd2.map(arr=>(arr(0) , JSONUtil.parseTag(arr(1))))
            //4.过滤,去除空评论
            val rdd4 = rdd3.filter(t=>t._2.size() > 0)
            //
            val rdd44 = rdd4.map(t=>{
                val busid = t._1
                val list = t._2
                var arr = new Array[String](list.size())
                var i:Int = 0
                import scala.collection.JavaConversions._
                for(x <- list){
                    arr(i) = x
                    i +=1
                }
                (busid , arr)
            })
            //5.变换rdd成为rdd[Row]
            val rdd5 = rdd44.map(t=>{
                Row(t._1,t._2)
            })

            //6.数据结构定义
            val mytype = StructType(List(
                StructField("busid" , DataTypes.StringType,false) ,
                StructField("tags" , DataTypes.createArrayType(DataTypes.StringType),false)
            ))
            //7.创建数据框
            val df = sess.createDataFrame(rdd5, mytype)
            //8.注册临时表
            df.createOrReplaceTempView("_tags")
            //9.炸开tags字段
            
            //val df2 = sess.sql("select busid , explode(tags) tag from _tags")  //OK
            //使用hive的横向视图完成炸裂数据的组合
            val df2 = sess.sql("select busid , tag from _tags lateral view explode(tags) xx as tag")
            //10.注册临时表
            df2.createOrReplaceTempView("_tags2")
            //11.统计每个商家每条评论的个数.
            val sql1 = "select busid, tag , count(*) cnt from _tags2 group by busid , tag order by busid , cnt desc" ;
            //12.聚合每个商家的所有评论,busid, List((tag,count),...,涉及子查询
            //val sql2 = "select t.busid , collect_list(struct(t.tag , t.cnt)) st from (" + sql1 + ") as t group by t.busid order by st[0].col2 desc "
            val sql2 = "select t.busid , collect_list(named_struct(‘tag‘ , t.tag , ‘cnt‘ , t.cnt)) st from (" + sql1 + ") as t group by t.busid order by st[0].cnt desc "
            sess.sql(sql2).show(10000, false)


    //        val sql2 = "select t.busid , collect_list(named_struct(‘tag‘ , t.tag , ‘cnt‘ , t.cnt)) st from (" + sql1 + ") as t group by t.busid "
    //        sess.sql(sql2).createOrReplaceTempView("_tags3")
    //        //13.对所有商家按照评论个数的最大值进行倒排序
    //        val sql3 = "select * from _tags3 order by st[0].cnt desc"
    //        sess.sql(sql3).show(10000,false)
        }
    }

 

以上是关于spark的累加器-SQL-Streaming的主要内容,如果未能解决你的问题,请参考以下文章

Spark 系列—— 累加器与广播变量

Spark 系列—— 累加器与广播变量

spark源码跟踪累加器Accumulators

Spark 累加器实验

Spark 累加器使用

入门大数据---Spark累加器与广播变量