spark

Posted 伊米伊念

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark相关的知识,希望对你有一定的参考价值。

 

    /*
     * spark算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。
     * spark算子的作用:
     * 1.输入:在spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入scala集合或数据)输入spark,
     *        数据进入spark运行时数据空间,转化为spark中的数据块,通过blockmanager进行管理
     * 2.运行:在spark数据输入形成RDD后便可以通过变换算子,如fliter等,对数据进行操作并将RDD转化为新的RDD,通过action算子,触发spark提交作业。
     *        如果数据需要复用,可以通过cache算子,将数据缓存到内存
     * 3.输出:程序运行结束数据会输出spark运行时控件,存储到分布式存储中(如savaAsTextFile输出到HDFS),
     *        或scala数据或集合中(collect输出到scala集合,count返回scala int类型)
     *        
     * spark算子大致上可以分三大类算子:
     * 1.value数据类型的transformation算子,这种变换不触发提交作业,针对处理的数据项是value型的数据
     *     map(func)  将原来RDD的每个数据项,使用map中用户自定义的函数func进行映射,转变为一个新的元素,并返回一个新的RDD。
     *     flatMap(func)  类似于map,但是输入数据项可以被映射到0个或多个输出数据集合中,所以函数func的返回值是一个数据项集合而不是一个单一的数据项。
     *     mapPartitions(func)  类似于map,但是该操作是在每个分区上分别执行,所以当操作一个类型为T的RDD时func的格式必须是Iterator<T> => Iterator<U>。即mapPartitions需要获取到每个分区的迭代器,在函数中通过这个分区的迭代器对整个分区的元素进行操作。
     *     union(otherDataset)  返回原数据集和参数指定的数据集合并后的数据集。使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同。该操作不进行去重操作,返回的结果会保存所有元素。如果想去重,可以使用distinct()。
     *     cartesian(otherDataset)  对类型为T和U的两个数据集进行操作,返回包含两个数据集所有元素对的(T,U)格式的数据集。即对两个RDD内的所有元素进行笛卡尔积操作。
     *     groupBy : 根据自定义的东东进行分组。groupBy是基本RDD就有的操作。
     *     groupByKey([numTasks])   操作(K,V)格式的数据集,返回 (K, Iterable)格式的数据集。
                   注意,如果分组是为了按key进行聚合操作(例如,计算sum、average),此时使用reduceByKey或aggregateByKey计算效率会更高。
                   注意,默认情况下,并行情况取决于父RDD的分区数,但可以通过参数numTasks来设置任务数。
     *     filter(func)  使用函数func对原RDD中数据项进行过滤,将符合func中条件的数据项组成新的RDD返回。
     *     distinct([numTasks]))  将RDD中的元素进行去重操作。
     *     subtract  返回一个由只存在于第一个RDD中而不存在于第二个RDD中的所有元素组成的RDD。它也需要shuffle
     *     sample(withReplacement, fraction, seed)   对数据采样。用户可以设定是否有放回(withReplacement)、采样的百分比(fraction)、随机种子(seed)。
     *     takeSample(withReplacement,num, [seed])  对一个数据集随机抽样,返回一个包含num个随机抽样元素的数组,参数withReplacement指定是否有放回抽样,参数seed指定生成随机数的种子。
     *     cache, persist  持久化
     * 2.key-value数据类型的transformation算子,这种变换不触发提交作业,针对处理的数据项是key-value型的数据
     *     mapValues  对各个键值对的值进行映射。该操作会保留RDD的分区信息。
     *     combineByKey 返回与输入数据的类型不同的返回值
     *     reduceByKey  与reduce相当类似,它们都接收一个函数,并使用该函数对值进行合并。不同的是,reduceByKey是transformation操作,reduceByKey只是对键相同的值进行规约,并最终形成RDD[(K, V)],而不像reduce那样返回单独一个“值”。
     *     partitionBy 对RDD进行分区,可以减少大量的shuffle.
     *     cogroup  可以对多个RDD进行连接、分组、甚至求键的交集。其他的连接操作都是基于cogroup实现的。
     *     join   对数据进行内连接,也即当两个键值对RDD中都存在对应键时才输出。当一个输入对应的某个键有多个值时,生成的键值对RDD会包含来自两个输入RDD的每一组相对应的记录,也即笛卡尔积。
     *     leftOutJoin  即左外连接,源RDD的每一个键都有对应的记录,第二个RDD的值可能缺失,因此用Option表示。
     *     rightOutJoin  即右外连接,与左外连接相反。
     *     fullOuterJoin  即全外连接,它是是左右外连接的并集。
     * 3.action算子,这类算子会触发sparkcontext提交作业
     *     foreach  对每个元素进行操作,并不会返回结果。
     *     foreachPartition : 基于分区的foreach操作,操作分区元素的迭代器,并不会返回结果。
     *     saveAsTextFile 用于将RDD写入文本文件。spark会将传入该函数的路径参数作为目录对待,默认情况下会在对应目录输出多个文件,这取决于并行度。
     *     saveAsObjectFile saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。
     *     collect  收集RDD的元素到driver节点,如果数据有序,那么collect得到的数据也会是有序的。大数据量最好不要使用RDD的collect,因为它会在本机上生成一个新的Array,以存储来自各个节点的所有数据,此时更好的办法是将数据存储在HDFS等分布式持久化层上。
     *     collectAsMap   将结果以Map的形式返回,以便查询。
     *     reduceByKeyLocally 该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。
     *     lookup   返回给定键对应的所有值。
     *     count  返回RDD元素个数。
     *     top   如果为元素定义了顺序,就可以使用top返回前几个元素。
     *     reduce   对RDD中所有元素进行规约,最终得到一个规约结果。reduce接收的规约函数要求其返回值类型与RDD中元素类型相同。
     *     fold  与reduce类似,不同的是,它接受一个“初始值”来作为每个分区第一次调用时的结果。fold同样要求规约函数返回值类型与RDD元素类型相同。
     *     aggregate 与reduce和fold类似,但它把我们从返回值类型必须与所操作的RDD元素类型相同的限制中解放出来。
     *     
     */
    
    /*
     * spark运行架构
     * 术语:
     *    application:spark application的概念和hadoop mapreduce中的类似,指的是用户编写的spark应用程序,
     *                 包含了一个driver功能的代码和分布在集群中多个节点上运行的executor代码。
     *    driver:spark中的driver即运行上述application的main()函数并且创建sparkcontext,其中创建sparkcontext的目的是为了准备spark应用程序的运行环境。
     *            在spark中由sparkcontext负责和clustermanager通信,进行资源的申请,任务的分配和监控等;当executor部分运行完毕后,driver负责将sparkcontext关闭。
     *            通常用sparkcontext代表driver。
     *    executor:application运行在worker节点上的一个进程,该进程负责运行task,并且负责将数据存在内存或者磁盘上,每个application都有各自独立的一批executor。
     *             在spark on yarn模式下,其进程名称为CoarseGrainedExecutorBackend,类似于hadoop mapreduce中的YarnChild。
     *             一个CoarseGrainedExecutorBackend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取一个空闲的线程运行Task。
     *             每个CoarseGrainedExecutorBackend能并行运行Task的数量就取决于分配给它的CPU的个数了
     *    Cluster Manager:指的是在集群上获取资源的外部服务,目前有:
     *                    Standalone:spark原生的资源管理,由Master负责资源的分配
     *                    Hadoop Yarn:由Yarn中的ResourceManager负责资源分分配
     *    Worker:集群中任何可以运行application代码的节点,类似于yarn中NodeManager节点。
     *           在Standalone模式中指的就是通过Slave文件配置的Worker节点,在spark on yarn模式中指的就是NodeManager节点
     *   作业(Job):包含多个Task组成的并行计算,往往由spark action催生,一个Job包含多个RDD及作用于相应RDD上的各种Operation。
     *   阶段(Stage):每个Job会被拆分很多组Task,每组任务被称为Stage,也可以成TaskSet,一个作业分多个阶段。
     *   任务(Task):被送到某个executor上的工作任务。
     */
    
    
    /*
     * spark运行基本流程
     * 1.构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone, Mesos或YARN)注册并申请运行Executor资源。
     * 2.资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上。
     * 3.SparkContext构建成DAG图,将DAG图分解成Stage,并把TaskSet发送给TaskScheduler。
     *   Executor向SparkContext申请Task,Task Schedule将Task发放到Executor运行同时SparkContext将应用程序代码发放给Executor。
     * 4.Task在Executor上运行,运行完毕释放所有资源。
     * 
     * spark注重建立良好的生态系统,它不仅支持多种外部文件存储系统,提供了多种多样的集群运行模式。
     * 部署在单台机器上时,既可以用本地(local)模式运行,也可以使用伪分布式模式来运行;
     * 当以分布式集群部署的时候,可以根据自己集群的实际情况选择Standalone模式(Spark自带的模式), YARN-Client模式或者YARN-Cluster模式。
     * Spark的各种运行模式虽然在启动方式,运行位置,调度策略上各有不同,但它们的目的基本都是一致的,就是在合适的位置安全可靠的根据用户的配置和Job的需要运行和管理Task。
     * Standalone模式:是Spark实现的资源调度框架,其主要的节点有Client节点,Master节点和Worker节点。
     *               其中Driver既可以运行的Master节点上的,也可以运行在本地Client端。当用spark-shell交互式工具提交Spark的Job时,Driver在Master节点上运行;
     *               当使用spark—submit工具提交Job或者在Eclipse,IDEA等开发平台上使用"new SparkConf.setManager("spark://master:7077")"方式运行Spark任务时,Driver是运行在本地Client端上的。
     *               运行过程如下:
     *                  1.SparkContext连接Master,向Master注册并申请资源(CPU Core和Memory);
     *                  2.Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上提取资源,然后启动StandaloneExecutorBackend;
     *                  3.StandaloneExecutorBackend向SparkContext注册;
     *                  4.SparkContext将APPlication代码发送给StandaloneExecutorBackend;
     *                    并且SparkContext解析Application代码,构建DAG图,并提交给DAG Schedule分解成Stage(当碰到Action操作时,就会催生Job,每个Job中含有一个或多个Stage, Stage一般在获取外部数据和shuffle之前产生),
     *                    然后以Stage(或者称为TaskSet)提交给Task Scheduler,Task Schedule负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行;
     *                  5.StandaloneExecutorBackend会建立Executor线程池,开始执行Task,并向SparkContext报告,知道Task完成;
     *                  6.所有Task完成后,SparkContext向Master注销,释放资源;
     * Spark on YARN运行过程:YARN是一种统一资源管理机制,在其上面可以运行多套计算框架。目前的大数据技术世界,大多数公司除了使用Spark来进行数据计算,由于历史原因火车单方面业务的性能考虑二使用着其他的计算框架,
     *                     比如MapReduce,Storm等计算框架。Spark基于此种情况开发了Spark on YARN的运行模式,由于借助了YARN良好的弹性资源管理机制,不仅部署Application更加方便,
     *                     而且用户在YARN集群中运行的服务和Application的资源可完全隔离,更具实践应用价值的是YARN可以通过队列的方式,管理同时运行在集群中的多个服务。
     *                     Spark on YARN模式根据Driver在集群中的位置分为两种模式:一种是YARN-Client模式,另一种是YARN-Cluster(或称为YARN-Standalone模式)。
     *                     YARN框架流程:任何框架与YARN的结合,都必须遵循YARN的开发模式。在分析Spark on YARN的实现细节之前,有必要先分析一下YARN框架的一些基本原理。
     *                     YARN框架的基本运行流程图为:ResourceManager负责将集群的资源分配给各个应用使用,而资源分配和调度的基本单位是Container,其中封装了机器资源,
     *                                            如内存、CPU、磁盘和网络等,每个任务会被分配一个Container,该任务只能在该Container中执行,并使用该Container封装的资源。
     *                                            NodeManager是一个个的计算节点,主要负责启动Application所需的Container,监控资源(内存、CPU、磁盘和网络等)的使用情况并将之汇报给ResourceManager。
     *                                            ResourceManager与NodeManager共同组成整个数据计算框架,ApplicationMaster与具体的Application相关,主要负责同ResourceManager协商以获取合适的Container,
     *                                            并跟踪这些Container的状态和监控其进度。
     *                     Yarn-Client:Yarn-Client模式中,Driver在客户端本地运行,这种模式可以使得Spark Application和客户端进行交互,因为Driver在客户端,所以可以通过webUI访问Driver的状态,       
     *                                  默认是http://hadoop1:4040访问,而YARN通过http://hadoop1:8088访问。
     *                                  Yarn-Client工作流:
     *                                       1.Spark Yarn Client向YARN的ResourceManager申请启动Application Master;
     *                                         同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler等,由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend;
     *                                       2.ResourceManager收到请求后,在集群上选择一个NodeManager,为该应用程序分配第一个Container,要求她在这个Container中启动应用程序的ApplicationMaster,
     *                                         与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分配;
     *                                       3.Client中SparkContext初始化完毕后,与Application建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);
     *                                       4.一旦ApplicationMaster申请到资源(也就是Container)后,便于对应的NodeManager通讯,要求它在获得的Container中启动CoarseGrainedExecutorBackend,
     *                                         CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;
     *                                       5.Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,
     *                                         以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
     *                                       6.应用程序运行完成后,Client的SparkContext向ResourceManager申请注册并关闭自己;
     *                     Yarn-Cluster:在Yarn-Cluster模式中,当用户向Yarn中提交一个应用程序后,
     *                                   Yarn将分两个阶段运行该应用程序:
     *                                      1.把Spark的Driver作为一个ApplicationMaster在YARN集群中先启动
     *                                      2.由ApplicationMaster创建应用程序,然后为它向ResourceManager申请资源,并启动Executor来运行Task,同时监控它的整个运行过程,直到运行完成
     *                                   Yarn-Cluster工作流程:
     *                                      1.Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序,启动ApplicationMaster的命令,需要在Executor中运行的程序等;
     *                                      2.ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动ApplicationMaster,
     *                                        其中ApplicationMaster进行SparkContext等的初始化;
     *                                      3.ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,
     *                                        然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态知道运行结束;
     *                                      4.一旦ApplicationMaster申请到资源(也就是Container)后,便于对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend,
     *                                        CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。
     *                                        这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedExecutorBackend配合YarnClusterScheduler进行任务的调度,
     *                                        其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等;
     *                                      5.ApplicationMaster中对SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,
     *                                        以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
     *                                      6.应用程序运行完成后,ApplicationMaster向ResourceManager申请注册并关闭自己;
     *                     Yarn-Client和Yarn-Cluster的区别:理解Yarn-Client和Yarn-Cluster深层次的区别之前先清楚一个概念:ApplicationMaster。
     *                                                    在Yarn中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。
     *                                                    它负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。
     *                                                    从深层次的含义讲Yarn-Cluster和Yarn-Client模式的区别其实就是ApplicationMaster进程的区别。
     *                                                    Yarn-Cluster模式下,Driver运行在ApplicationMaster中,负责向Yarn申请资源,并监督作业的运行状况。
     *                                                                      当用户提交作业之后,就可以关掉Client,作业会继续在Yarn上运行,因而Yarn-Cluster模式下不适合运行交互类型的作业。
     *                                                    Yarn-Client模式下,ApplicationMaster仅仅向Yarn请求Executor,Client会和请求的Container通信来调度他们的工作,也就是说Client不能离开。
     *                     
     */
    

 

以上是关于spark的主要内容,如果未能解决你的问题,请参考以下文章

spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段

Spark闭包与序列化

spark 例子wordcount topk

Spark:如何加速 foreachRDD?

Spark发现匹配字符串的出现次数

控制 spark-sql 和数据帧中的字段可空性