第1课:通过案例对SparkStreaming 透彻理解三板斧之一

Posted michaelli916

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第1课:通过案例对SparkStreaming 透彻理解三板斧之一相关的知识,希望对你有一定的参考价值。

spark作为apache旗下顶级项目之一,在2015年火得一塌糊涂,在2016年更是势不可挡,下面两图可见一斑:

这里写图片描述
这里写图片描述

对于spark的学习,掌握其API的使用仅仅只是皮毛,我们要深入源码研究其本质,能够做到源码级别的修改和定制,才是真正掌握了它,也才能更好地使用它。从今天起,我们将踏上这一征程。
Spark的子框架有若干, 我们将从Spark Streaming着手切入Spark版本定制,通过对该框架的彻底研究,我们推而广之到spark的各个框架,可以精通Spark力量的源泉和所有问题的解决之道。为什么选择Spark Streaming作为切入点呢?首先是因为数据有时效性,过期的数据就像过期的食物一样,远没有新鲜的食物来的有营养,我们以往选择批处理很多时候是因为技术和资源的限制,做不到流处理,只能退而求其次,从本质上来讲,流处理才是数据处理的王道!现在的时代是流处理的时代。其次,Spark Streaming自从推出以来,收到了越来越多的关注,50%以上的用户都将它视作spark中最重要的部分,如下图可见:

这里写图片描述
Spark的流式处理可以无缝配合使用SPARK SQL,GraphX与MLlib的功能,这得益于Spark一体化、多元化的基础架构设计,所谓兄弟齐心,其力断金,这正是Spark真正可怕之处,也正是Spark Streaming必将一统天下的根源;Spark Streaming由于其数据输入的动态性,需要动态控制数据的流入,作业的切分,还有数据的处理,所以最容易出问题,需要认真学习仔细掌握; Spark Streaming与其它子框架不同之处在于,它更像是Spark Core之上的一个应用程序,在学过了Spark Core之后,进一步研究Spark Streaming也为我们后续做Spark上的复杂的程序提供了最好的参考。
我们先来看下官网对spark streaming的精要介绍:

这里写图片描述

接下来进入今天的正题。
一。Spark Streaming另类在线实验
由于Spark Streaming中的数据是动态流入的,对数据的处理也是由框架自动地周期性地产生Job来处理的,在这种动态性变化性下,我们如何清晰地看到数据的流入和被处理的过程?我们的技巧是,通过调大Batch Interval来降低动态变化性,以方便透视细节。需要说明的是,这只是学习研究的需要,实际生产环境中在集群能够处理的情况下,batch interval一般都是越小越好。我们从已写过的广告点击的在线黑名单过滤的Spark Streaming应用程序入手,首先来看下程序代码:

object OnlineBlackListFilter {
    def main(args: Array[String]){
      /**
       * 第1步:创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
       * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
       * 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
       * 只有1G的内存)的初学者       * 
       */
      val conf = new SparkConf() //创建SparkConf对象
      conf.setAppName("OnlineBlackListFilter") //设置应用程序的名称,在程序运行的监控界面可以看到该名称
      conf.setMaster("spark://Master:7077") //此时,程序在Spark集群


      //*val ssc = new StreamingContext(conf, Seconds(30))
      val ssc = new StreamingContext(conf, Seconds(300))
      /**
       * 黑名单数据准备,实际上黑名单一般都是动态的,例如在Redis或者数据库中,黑名单的生成往往有复杂的业务
       * 逻辑,具体情况算法不同,但是在Spark Streaming进行处理的时候每次都能够访问完整的信息
       */
      val blackList = Array(("hadoop", true),("mahout", true))
      val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)

      val adsClickStream = ssc.socketTextStream("Master", 9999)

      /**
       * 此处模拟的广告点击的每条数据的格式为:time、name
       * 此处map操作的结果是name、(time,name)的格式
       */
      val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(" ")(1), ads) }
      adsClickStreamFormatted.transform(userClickRDD => {
        //通过leftOuterJoin操作既保留了左侧用户广告点击内容的RDD的所有内容,又获得了相应点击内容是否在黑名单中
        val joinedBlackListRDD = userClickRDD.leftOuterJoin(blackListRDD)

        /**
         * 进行filter过滤的时候,其输入元素是一个Tuple:(name,((time,name), boolean))
         * 其中第一个元素是黑名单的名称,第二元素的第二个元素是进行leftOuterJoin的时候是否存在值
         * 如果存在的话,表面当前广告点击是黑名单,需要过滤掉,否则的话则是有效点击内容;
         */
        val validClicked = joinedBlackListRDD.filter(joinedItem => {
          if(joinedItem._2._2.getOrElse(false))
          {
            false
          } else {
            true
          }

        })

        validClicked.map(validClick => {validClick._2._1})
      }).print

      /**
       * 计算后的有效数据一般都会写入Kafka中,下游的计费系统会从kafka中pull到有效数据进行计费
       */
      ssc.start()
      ssc.awaitTermination()

    }
}

程序打包好了后,就可以着手我们的测试了。
1.首先做好准备工作,启动hdfs集群(之所以要启动hdfs,是因为我们的spark集群的spark.eventLog.dir 和 spark.history.fs.logDirectory存储在hdfs上);
Start-dfs.sh:
这里写图片描述
2.启动spark集群,start-all.sh:
这里写图片描述
3.启动Spark的History Server,方便我们在查看程序运行细节。
Start-history-server.sh:
这里写图片描述
4.打开数据发送的端口,需要先执行nc,如果直接执行打包的程序,会报错,会报端口被拒绝的错误,因为9999端口确实未启动:
nc -lk 9999

5.用spark-submit运行前面生成的jar包:
/usr/local/spark/spark-1.6.1-bin-hadoop2.6/bin/spark-submit –class com.dt.spark.sparkstreaming.OnlineBlackListFilter –master spark://master:7077 /usr/local/idea/SparkApps.jar
6.在数据发送端口输入若干数据,比如:
111111 Spark
222222 hadoop
333333 flink
444444 kafka
555555 scala
666666 flume
7.程序运行过程中,控制台可见如下打印信息:
这里写图片描述
可见,黑名单中的hadoop确实被过滤掉了,其余的信息都被正确地显示了。
8.程序运行过程中,通过webui查看运行状态:
这里写图片描述
9.Ctrl+C手动结束程序后,webui查看运行详情:
这里写图片描述
这里写图片描述
这里写图片描述

初步可见,程序运行在2个worker节点的2个executor中,总共有5个Job。
这跟我们熟知的spark core, spark sql有些不同,程序总共才运行2.7分钟,为啥有5个Job呢?
接下来去具体的Job中看一看。
10. Job 0:
这里写图片描述
可以发现该Job分为2个stage,运行在所有的2个Executor,这两个stage都是由我们的程序中的66行的代码,即ssc.start()触发的。
接下来看下stage0:
这里写图片描述
该stage 也是有我们的ssc.start()触发的,共包含50个task,运行在所有的2个executor上。
接下来看下stage1:
这里写图片描述
该stage 也是有我们的ssc.start()触发的,共包含20个task,运行在所有的2个executor上.
11.Job 1的细节:
这里写图片描述
该job的Stage2的细节:
这里写图片描述
该job也是由我们的ssc.start()触发的,只包含1个task,运行在executor0上,本定性为process_local,耗时1.9分钟。该Job实际就是启动了一个接收数据的线程Receiver。
Spark Streaming启动Receiver的时候就会通过Job来启动。而且Receiver只会在一个Executor中执行,且以一个Task去接受我们的数据。Receiver接受数据和普通的Job没有任何区别.
Receiver :长时间(可能7*24小时)运行在Excutor之上,每个Receiver负责一个inuptDStream (比如读取一个kafka消息的输入流)。每个Receiver,加上inputDStream 会占用一个core/slot
重要启示:一个Spark应用程序中可以启动很多Job,而这些不同的Job之间可以相互配合。这一认识为我们写复杂Spark程序奠定了良好的基础。
12.Job 2的详情:
这里写图片描述
该Job的各个stage对应我们的程序的第40,32和61行( val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(” “)(1), ads) },val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)和print),该Job处理的就是我们程序的主要业务逻辑。
我们看Stage3的详情:

这里写图片描述
这里写图片描述
该Stage是由worker2上的Executor1执行的,数据写在了worker2的磁盘上(shuffle write).
Stage4:
这里写图片描述
这里写图片描述
该stage对应的就是(val blackListRDD = ssc.sparkContext.parallelize(blackList, 8),可以看到在worker1和worker2的executor上都有该rdd的分区数据,且并行度是我们手动指定的8, 数据写在了worker1的磁盘上(shuffle write).
Stage5:
这里写图片描述
这里写图片描述
该stage运行在Worker2的executor1上。
可以看出虽然在一台服务器上接受数据,但是会在多台服务器上处理数据。
13.Job3:
这里写图片描述
这里写图片描述
Stage6和stage7被skip掉了,因为数据在前一个job执行的时候已经shuffle write在了磁盘上。
Stage8的详情:
这里写图片描述
这里写图片描述
该stage运行在Worker1和Worker12的executor上。
同Job2一样,Job3的各个stage对应我们的程序的第40,32和61行(val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(” “)(1), ads) }, val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)和print),该Job处理的就是我们程序的主要业务逻辑。
14.Job4:
这里写图片描述
这里写图片描述
同Job2,Job3一样,Job4的各个stage对应我们的程序的第40,32和61行( val adsClickStreamFormatted = adsClickStream.map { ads => (ads.split(” “)(1), ads) }, val blackListRDD = ssc.sparkContext.parallelize(blackList, 8)和print),该Job处理的就是我们程序的主要业务逻辑。
2 瞬间理解Spark Streaming本质
DStream是一个没有边界的集合,没有大小的限制。DStream代表了时空的概念。随着时间的推移,里面不断产生RDD。
时间已固定,我们就锁定到空间的操作。
从空间的维度来讲,就是处理层面。
对DStream的操作,构成了DStreamGraph。如以下图例所示:
这里写图片描述
上图中每个foreach都会触发一个作业,作业会顺着依赖从后往前回溯,形成DAG,如下图所示:
这里写图片描述
空间维度确定之后,随着时间不断推进,会不断实例化RDD Graph,然后触发Job去执行处理。
最后再去读下官方的Spark Streaming的文档:
这里写图片描述
总结:在Spark Streaming的应用程序中,框架自动帮我们提交了一些Job,来完成一些事情,从而简化我们的程序逻辑,使我们只需关注在业务逻辑代码上,这正是spark streaming的精华所在,正体现了spark框架的易用性。而要想彻底掌握spark streaming,我们有必要通过这些现象,反过来回溯去寻根问源。接下来几节课我们会逐步深入,抽丝剥茧,去看这些本质。

本次分享来自于王家林老师的课程‘源码版本定制发行班’,在此向王家林老师表示感谢!
欢迎大家交流技术知识!一起学习,共同进步!

以上是关于第1课:通过案例对SparkStreaming 透彻理解三板斧之一的主要内容,如果未能解决你的问题,请参考以下文章

第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析

第12课:Spark Streaming源码解读之Executor容错安全性

第9课:Spark Streaming源码解读之Receiver在Driver的精妙实现全生命周期彻底研究和思考

(版本定制)第16课:Spark Streaming源码解读之数据清理内幕彻底解密

(版本定制)第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析

第12课:Spark Streaming源码解读之Executor容错安全性