spark
Posted 伊米伊念
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark相关的知识,希望对你有一定的参考价值。
/* * spark算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。 * spark算子的作用: * 1.输入:在spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取HDFS等,parallelize方法输入scala集合或数据)输入spark, * 数据进入spark运行时数据空间,转化为spark中的数据块,通过blockmanager进行管理 * 2.运行:在spark数据输入形成RDD后便可以通过变换算子,如fliter等,对数据进行操作并将RDD转化为新的RDD,通过action算子,触发spark提交作业。 * 如果数据需要复用,可以通过cache算子,将数据缓存到内存 * 3.输出:程序运行结束数据会输出spark运行时控件,存储到分布式存储中(如savaAsTextFile输出到HDFS), * 或scala数据或集合中(collect输出到scala集合,count返回scala int类型) * * spark算子大致上可以分三大类算子: * 1.value数据类型的transformation算子,这种变换不触发提交作业,针对处理的数据项是value型的数据 * map(func) 将原来RDD的每个数据项,使用map中用户自定义的函数func进行映射,转变为一个新的元素,并返回一个新的RDD。 * flatMap(func) 类似于map,但是输入数据项可以被映射到0个或多个输出数据集合中,所以函数func的返回值是一个数据项集合而不是一个单一的数据项。 * mapPartitions(func) 类似于map,但是该操作是在每个分区上分别执行,所以当操作一个类型为T的RDD时func的格式必须是Iterator<T> => Iterator<U>。即mapPartitions需要获取到每个分区的迭代器,在函数中通过这个分区的迭代器对整个分区的元素进行操作。 * union(otherDataset) 返回原数据集和参数指定的数据集合并后的数据集。使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同。该操作不进行去重操作,返回的结果会保存所有元素。如果想去重,可以使用distinct()。 * cartesian(otherDataset) 对类型为T和U的两个数据集进行操作,返回包含两个数据集所有元素对的(T,U)格式的数据集。即对两个RDD内的所有元素进行笛卡尔积操作。 * groupBy : 根据自定义的东东进行分组。groupBy是基本RDD就有的操作。 * groupByKey([numTasks]) 操作(K,V)格式的数据集,返回 (K, Iterable)格式的数据集。 注意,如果分组是为了按key进行聚合操作(例如,计算sum、average),此时使用reduceByKey或aggregateByKey计算效率会更高。 注意,默认情况下,并行情况取决于父RDD的分区数,但可以通过参数numTasks来设置任务数。 * filter(func) 使用函数func对原RDD中数据项进行过滤,将符合func中条件的数据项组成新的RDD返回。 * distinct([numTasks])) 将RDD中的元素进行去重操作。 * subtract 返回一个由只存在于第一个RDD中而不存在于第二个RDD中的所有元素组成的RDD。它也需要shuffle * sample(withReplacement, fraction, seed) 对数据采样。用户可以设定是否有放回(withReplacement)、采样的百分比(fraction)、随机种子(seed)。 * takeSample(withReplacement,num, [seed]) 对一个数据集随机抽样,返回一个包含num个随机抽样元素的数组,参数withReplacement指定是否有放回抽样,参数seed指定生成随机数的种子。 * cache, persist 持久化 * 2.key-value数据类型的transformation算子,这种变换不触发提交作业,针对处理的数据项是key-value型的数据 * mapValues 对各个键值对的值进行映射。该操作会保留RDD的分区信息。 * combineByKey 返回与输入数据的类型不同的返回值 * reduceByKey 与reduce相当类似,它们都接收一个函数,并使用该函数对值进行合并。不同的是,reduceByKey是transformation操作,reduceByKey只是对键相同的值进行规约,并最终形成RDD[(K, V)],而不像reduce那样返回单独一个“值”。 * partitionBy 对RDD进行分区,可以减少大量的shuffle. * cogroup 可以对多个RDD进行连接、分组、甚至求键的交集。其他的连接操作都是基于cogroup实现的。 * join 对数据进行内连接,也即当两个键值对RDD中都存在对应键时才输出。当一个输入对应的某个键有多个值时,生成的键值对RDD会包含来自两个输入RDD的每一组相对应的记录,也即笛卡尔积。 * leftOutJoin 即左外连接,源RDD的每一个键都有对应的记录,第二个RDD的值可能缺失,因此用Option表示。 * rightOutJoin 即右外连接,与左外连接相反。 * fullOuterJoin 即全外连接,它是是左右外连接的并集。 * 3.action算子,这类算子会触发sparkcontext提交作业 * foreach 对每个元素进行操作,并不会返回结果。 * foreachPartition : 基于分区的foreach操作,操作分区元素的迭代器,并不会返回结果。 * saveAsTextFile 用于将RDD写入文本文件。spark会将传入该函数的路径参数作为目录对待,默认情况下会在对应目录输出多个文件,这取决于并行度。 * saveAsObjectFile saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。 * collect 收集RDD的元素到driver节点,如果数据有序,那么collect得到的数据也会是有序的。大数据量最好不要使用RDD的collect,因为它会在本机上生成一个新的Array,以存储来自各个节点的所有数据,此时更好的办法是将数据存储在HDFS等分布式持久化层上。 * collectAsMap 将结果以Map的形式返回,以便查询。 * reduceByKeyLocally 该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。 * lookup 返回给定键对应的所有值。 * count 返回RDD元素个数。 * top 如果为元素定义了顺序,就可以使用top返回前几个元素。 * reduce 对RDD中所有元素进行规约,最终得到一个规约结果。reduce接收的规约函数要求其返回值类型与RDD中元素类型相同。 * fold 与reduce类似,不同的是,它接受一个“初始值”来作为每个分区第一次调用时的结果。fold同样要求规约函数返回值类型与RDD元素类型相同。 * aggregate 与reduce和fold类似,但它把我们从返回值类型必须与所操作的RDD元素类型相同的限制中解放出来。 * */ /* * spark运行架构 * 术语: * application:spark application的概念和hadoop mapreduce中的类似,指的是用户编写的spark应用程序, * 包含了一个driver功能的代码和分布在集群中多个节点上运行的executor代码。 * driver:spark中的driver即运行上述application的main()函数并且创建sparkcontext,其中创建sparkcontext的目的是为了准备spark应用程序的运行环境。 * 在spark中由sparkcontext负责和clustermanager通信,进行资源的申请,任务的分配和监控等;当executor部分运行完毕后,driver负责将sparkcontext关闭。 * 通常用sparkcontext代表driver。 * executor:application运行在worker节点上的一个进程,该进程负责运行task,并且负责将数据存在内存或者磁盘上,每个application都有各自独立的一批executor。 * 在spark on yarn模式下,其进程名称为CoarseGrainedExecutorBackend,类似于hadoop mapreduce中的YarnChild。 * 一个CoarseGrainedExecutorBackend进程有且仅有一个executor对象,它负责将Task包装成taskRunner,并从线程池中抽取一个空闲的线程运行Task。 * 每个CoarseGrainedExecutorBackend能并行运行Task的数量就取决于分配给它的CPU的个数了 * Cluster Manager:指的是在集群上获取资源的外部服务,目前有: * Standalone:spark原生的资源管理,由Master负责资源的分配 * Hadoop Yarn:由Yarn中的ResourceManager负责资源分分配 * Worker:集群中任何可以运行application代码的节点,类似于yarn中NodeManager节点。 * 在Standalone模式中指的就是通过Slave文件配置的Worker节点,在spark on yarn模式中指的就是NodeManager节点 * 作业(Job):包含多个Task组成的并行计算,往往由spark action催生,一个Job包含多个RDD及作用于相应RDD上的各种Operation。 * 阶段(Stage):每个Job会被拆分很多组Task,每组任务被称为Stage,也可以成TaskSet,一个作业分多个阶段。 * 任务(Task):被送到某个executor上的工作任务。 */ /* * spark运行基本流程 * 1.构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone, Mesos或YARN)注册并申请运行Executor资源。 * 2.资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上。 * 3.SparkContext构建成DAG图,将DAG图分解成Stage,并把TaskSet发送给TaskScheduler。 * Executor向SparkContext申请Task,Task Schedule将Task发放到Executor运行同时SparkContext将应用程序代码发放给Executor。 * 4.Task在Executor上运行,运行完毕释放所有资源。 * * spark注重建立良好的生态系统,它不仅支持多种外部文件存储系统,提供了多种多样的集群运行模式。 * 部署在单台机器上时,既可以用本地(local)模式运行,也可以使用伪分布式模式来运行; * 当以分布式集群部署的时候,可以根据自己集群的实际情况选择Standalone模式(Spark自带的模式), YARN-Client模式或者YARN-Cluster模式。 * Spark的各种运行模式虽然在启动方式,运行位置,调度策略上各有不同,但它们的目的基本都是一致的,就是在合适的位置安全可靠的根据用户的配置和Job的需要运行和管理Task。 * Standalone模式:是Spark实现的资源调度框架,其主要的节点有Client节点,Master节点和Worker节点。 * 其中Driver既可以运行的Master节点上的,也可以运行在本地Client端。当用spark-shell交互式工具提交Spark的Job时,Driver在Master节点上运行; * 当使用spark—submit工具提交Job或者在Eclipse,IDEA等开发平台上使用"new SparkConf.setManager("spark://master:7077")"方式运行Spark任务时,Driver是运行在本地Client端上的。 * 运行过程如下: * 1.SparkContext连接Master,向Master注册并申请资源(CPU Core和Memory); * 2.Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上提取资源,然后启动StandaloneExecutorBackend; * 3.StandaloneExecutorBackend向SparkContext注册; * 4.SparkContext将APPlication代码发送给StandaloneExecutorBackend; * 并且SparkContext解析Application代码,构建DAG图,并提交给DAG Schedule分解成Stage(当碰到Action操作时,就会催生Job,每个Job中含有一个或多个Stage, Stage一般在获取外部数据和shuffle之前产生), * 然后以Stage(或者称为TaskSet)提交给Task Scheduler,Task Schedule负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行; * 5.StandaloneExecutorBackend会建立Executor线程池,开始执行Task,并向SparkContext报告,知道Task完成; * 6.所有Task完成后,SparkContext向Master注销,释放资源; * Spark on YARN运行过程:YARN是一种统一资源管理机制,在其上面可以运行多套计算框架。目前的大数据技术世界,大多数公司除了使用Spark来进行数据计算,由于历史原因火车单方面业务的性能考虑二使用着其他的计算框架, * 比如MapReduce,Storm等计算框架。Spark基于此种情况开发了Spark on YARN的运行模式,由于借助了YARN良好的弹性资源管理机制,不仅部署Application更加方便, * 而且用户在YARN集群中运行的服务和Application的资源可完全隔离,更具实践应用价值的是YARN可以通过队列的方式,管理同时运行在集群中的多个服务。 * Spark on YARN模式根据Driver在集群中的位置分为两种模式:一种是YARN-Client模式,另一种是YARN-Cluster(或称为YARN-Standalone模式)。 * YARN框架流程:任何框架与YARN的结合,都必须遵循YARN的开发模式。在分析Spark on YARN的实现细节之前,有必要先分析一下YARN框架的一些基本原理。 * YARN框架的基本运行流程图为:ResourceManager负责将集群的资源分配给各个应用使用,而资源分配和调度的基本单位是Container,其中封装了机器资源, * 如内存、CPU、磁盘和网络等,每个任务会被分配一个Container,该任务只能在该Container中执行,并使用该Container封装的资源。 * NodeManager是一个个的计算节点,主要负责启动Application所需的Container,监控资源(内存、CPU、磁盘和网络等)的使用情况并将之汇报给ResourceManager。 * ResourceManager与NodeManager共同组成整个数据计算框架,ApplicationMaster与具体的Application相关,主要负责同ResourceManager协商以获取合适的Container, * 并跟踪这些Container的状态和监控其进度。 * Yarn-Client:Yarn-Client模式中,Driver在客户端本地运行,这种模式可以使得Spark Application和客户端进行交互,因为Driver在客户端,所以可以通过webUI访问Driver的状态, * 默认是http://hadoop1:4040访问,而YARN通过http://hadoop1:8088访问。 * Yarn-Client工作流: * 1.Spark Yarn Client向YARN的ResourceManager申请启动Application Master; * 同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler等,由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend; * 2.ResourceManager收到请求后,在集群上选择一个NodeManager,为该应用程序分配第一个Container,要求她在这个Container中启动应用程序的ApplicationMaster, * 与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分配; * 3.Client中SparkContext初始化完毕后,与Application建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container); * 4.一旦ApplicationMaster申请到资源(也就是Container)后,便于对应的NodeManager通讯,要求它在获得的Container中启动CoarseGrainedExecutorBackend, * CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task; * 5.Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度, * 以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务; * 6.应用程序运行完成后,Client的SparkContext向ResourceManager申请注册并关闭自己; * Yarn-Cluster:在Yarn-Cluster模式中,当用户向Yarn中提交一个应用程序后, * Yarn将分两个阶段运行该应用程序: * 1.把Spark的Driver作为一个ApplicationMaster在YARN集群中先启动 * 2.由ApplicationMaster创建应用程序,然后为它向ResourceManager申请资源,并启动Executor来运行Task,同时监控它的整个运行过程,直到运行完成 * Yarn-Cluster工作流程: * 1.Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序,启动ApplicationMaster的命令,需要在Executor中运行的程序等; * 2.ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动ApplicationMaster, * 其中ApplicationMaster进行SparkContext等的初始化; * 3.ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态, * 然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态知道运行结束; * 4.一旦ApplicationMaster申请到资源(也就是Container)后,便于对应的NodeManager通信,要求它在获得的Container中启动CoarseGrainedExecutorBackend, * CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。 * 这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedExecutorBackend配合YarnClusterScheduler进行任务的调度, * 其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等; * 5.ApplicationMaster中对SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度, * 以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务; * 6.应用程序运行完成后,ApplicationMaster向ResourceManager申请注册并关闭自己; * Yarn-Client和Yarn-Cluster的区别:理解Yarn-Client和Yarn-Cluster深层次的区别之前先清楚一个概念:ApplicationMaster。 * 在Yarn中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。 * 它负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。 * 从深层次的含义讲Yarn-Cluster和Yarn-Client模式的区别其实就是ApplicationMaster进程的区别。 * Yarn-Cluster模式下,Driver运行在ApplicationMaster中,负责向Yarn申请资源,并监督作业的运行状况。 * 当用户提交作业之后,就可以关掉Client,作业会继续在Yarn上运行,因而Yarn-Cluster模式下不适合运行交互类型的作业。 * Yarn-Client模式下,ApplicationMaster仅仅向Yarn请求Executor,Client会和请求的Container通信来调度他们的工作,也就是说Client不能离开。 * */
以上是关于spark的主要内容,如果未能解决你的问题,请参考以下文章
spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段