Hadoop,MapReduce,YARN和Spark的区别与联系

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop,MapReduce,YARN和Spark的区别与联系相关的知识,希望对你有一定的参考价值。

  (1) Hadoop 1.0

  第一代Hadoop,由分布式存储系统HDFS和分布式计算框架MapReduce组成,其中,HDFS由一个NameNode和多个DataNode组成,MapReduce由一个JobTracker和多个TaskTracker组成,对应Hadoop版本为Hadoop 1.x和0.21.X,0.22.x。

  (2) Hadoop 2.0

  第二代Hadoop,为克服Hadoop 1.0中HDFS和MapReduce存在的各种问题而提出的。针对Hadoop 1.0中的单NameNode制约HDFS的扩展性问题,提出了HDFS Federation,它让多个NameNode分管不同的目录进而实现访问隔离和横向扩展;针对Hadoop 1.0中的MapReduce在扩展性和多框架支持方面的不足,提出了全新的资源管理框架YARN(Yet Another Resource Negotiator),它将JobTracker中的资源管理和作业控制功能分开,分别由组件ResourceManager和ApplicationMaster实现,其中,ResourceManager负责所有应用程序的资源分配,而ApplicationMaster仅负责管理一个应用程序。对应Hadoop版本为Hadoop 0.23.x和2.x。

  (3) MapReduce 1.0或者MRv1(MapReduceversion 1)

  第一代MapReduce计算框架,它由两部分组成:编程模型(programming model)和运行时环境(runtime environment)。它的基本编程模型是将问题抽象成Map和Reduce两个阶段,其中Map阶段将输入数据解析成key/value,迭代调用map()函数处理后,再以key/value的形式输出到本地目录,而Reduce阶段则将key相同的value进行规约处理,并将最终结果写到HDFS上。它的运行时环境由两类服务组成:JobTracker和TaskTracker,其中,JobTracker负责资源管理和所有作业的控制,而TaskTracker负责接收来自JobTracker的命令并执行它。

  (4)MapReduce 2.0或者MRv2(MapReduce version 2)或者NextGen MapReduc

  MapReduce 2.0或者MRv2具有与MRv1相同的编程模型,唯一不同的是运行时环境。MRv2是在MRv1基础上经加工之后,运行于资源管理框架YARN之上的MRv1,它不再由JobTracker和TaskTracker组成,而是变为一个作业控制进程ApplicationMaster,且ApplicationMaster仅负责一个作业的管理,至于资源的管理,则由YARN完成。

  简而言之,MRv1是一个独立的离线计算框架,而MRv2则是运行于YARN之上的MRv1。

  (5)Hadoop-MapReduce(一个离线计算框架)

  Hadoop是google分布式计算框架MapReduce与分布式存储系统GFS的开源实现,由分布式计算框架MapReduce和分布式存储系统HDFS(Hadoop Distributed File System)组成,具有高容错性,高扩展性和编程接口简单等特点,现已被大部分互联网公司采用。

  (6)Hadoop-YARN(Hadoop 2.0的一个分支,实际上是一个资源管理系统)

  YARN是Hadoop的一个子项目(与MapReduce并列),它实际上是一个资源统一管理系统,可以在上面运行各种计算框架(包括MapReduce、Spark、Storm、MPI等)。

  

  当前Hadoop版本比较混乱,让很多用户不知所措。实际上,当前Hadoop只有两个版本:Hadoop 1.0和Hadoop 2.0,其中,Hadoop 1.0由一个分布式文件系统HDFS和一个离线计算框架MapReduce组成,而Hadoop 2.0则包含一个支持NameNode横向扩展的HDFS,一个资源管理系统YARN和一个运行在YARN上的离线计算框架MapReduce。相比于Hadoop 1.0,Hadoop 2.0功能更加强大,且具有更好的扩展性、性能,并支持多种计算框架。

  

  Borg/YARN/Mesos/Torca/Corona一类系统可以为公司构建一个内部的生态系统,所有应用程序和服务可以“和平而友好”地运行在该生态系统上。有了这类系统之后,你不必忧愁使用Hadoop的哪个版本,是Hadoop 0.20.2还是 Hadoop 1.0,你也不必为选择何种计算模型而苦恼,因此各种软件版本,各种计算模型可以一起运行在一台“超级计算机”上了。

  从开源角度看,YARN的提出,从一定程度上弱化了多计算框架的优劣之争。YARN是在Hadoop MapReduce基础上演化而来的,在MapReduce时代,很多人批评MapReduce不适合迭代计算和流失计算,于是出现了Spark和Storm等计算框架,而这些系统的开发者则在自己的网站上或者论文里与MapReduce对比,鼓吹自己的系统多么先进高效,而出现了YARN之后,则形势变得明朗:MapReduce只是运行在YARN之上的一类应用程序抽象,Spark和Storm本质上也是,他们只是针对不同类型的应用开发的,没有优劣之别,各有所长,合并共处,而且,今后所有计算框架的开发,不出意外的话,也应是在YARN之上。这样,一个以YARN为底层资源管理平台,多种计算框架运行于其上的生态系统诞生了。

  

  目前spark是一个非常流行的内存计算(或者迭代式计算,DAG计算)框架,在MapReduce因效率低下而被广为诟病的今天,spark的出现不禁让大家眼前一亮。

  从架构和应用角度上看,spark是一个仅包含计算逻辑的开发库(尽管它提供个独立运行的master/slave服务,但考虑到稳定后以及与其他类型作业的继承性,通常不会被采用),而不包含任何资源管理和调度相关的实现,这使得spark可以灵活运行在目前比较主流的资源管理系统上,典型的代表是mesos和yarn,我们称之为“spark on mesos”和“spark on yarn”。将spark运行在资源管理系统上将带来非常多的收益,包括:与其他计算框架共享集群资源;资源按需分配,进而提高集群资源利用率等。

  FrameWork On YARN

  运行在YARN上的框架,包括MapReduce-On-YARN, Spark-On-YARN, Storm-On-YARN和Tez-On-YARN。

  (1)MapReduce-On-YARN:YARN上的离线计算;

  (2)Spark-On-YARN:YARN上的内存计算;

  (3)Storm-On-YARN:YARN上的实时/流式计算;

  (4)Tez-On-YARN:YARN上的DAG计算
参考技术A

可以只用一行代码来运行MapReduce作业:JobClient.runJon(conf),Job作业运行时参与的四个实体:


     1.JobClient 写代码,配置作业,提交作业。

     2.JobTracker:初始化作业,分配作业,协调作业运行。这是一个java程序,主类是JobTracker。

     3.TaskTracker:运行作业划分后的任务,即分配数据分配上执行Map或Reduce任务。

     4.HDFS:保存作业数据、配置信息等,保存作业结果。


Map/Reduce 作业总体执行流程:

     代码编写 ----> 作业配置  ---->  作业提交 ----> Map任务分配和执行 ----> 处理中间结果 ---->  Reduce任务分配与执行 ---->  输出结果

而对于每个作业的执行,又包含:

     输入准备 ----> 任务执行 ----> 输出结果

作业提交JobClient:

     JobClient的runJob方法产生一个Jobclient实例并调用其submitJob方法,然后runJob开始循环吗,并在循环中调用getTaskCompetionEvents方法,获得TaskCompletionEvent实例,每秒轮询作业进度(后面有介绍进度和状态更新),把进度写到控制台,作业完成后显示作业计数器,若失败,则把错误记录到控制台。


     submitJob方法作业提交的过程:

     1.向JobTracker请求一个新的JobId。

     2.检查作业相关路径,如果路径不正确就会返回错误。

     3.计算作业输入分片及其划分信息。

     4.将作业运行需要的资源(jar文件、配置文件等)复制到Shared HDFS,并

复制多个副本(参数控制,默认值为10)供tasktracker访问,也会将计算的分片复制到HDFS。

     5.调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。


作业的初始化JobTracker:

     JobTracker收到submitJob方法调用后,会把调用放入到一个内部队列,由作业调度器(Job scheduler)进行调度并对其初始化。Job初始化即创建一个作业对象。

     当作业被调度后,JobTracker会创建一个代表这个作业的JobInProgress对象,并将任务和记录信息封装在这个对象中,以便跟踪任务状态和进程。

     初始化过程就是JobInProgress对象的initTasks方法进行初始化的。


     初始化步骤:

          1.从HDFS中读取作业对应的job.split信息,为后面的初始化做好准备。

          2.创建并初始化map和reduce任务。根据数据分片信息中的个数确定map task的个数,然后为每个map task生成一个TaskInProgress对象来处理数据分片,先将其放入nonRunningMapCache,以便JobTracker分配任务的时候使用。接下来根据JobConf中的mapred.reduce.tasks属性利用setNumReduceTasks()方法设置reduce task的数量,然后同map task创建方式。

          3.最后就是创建两个初始化task,进行map和reduce的初始化。


任务的分配JobTracker:

    消息传递HeartBeat: tasktracker运行一个简单循环定期发送心跳(heartbeat)给JobTracker。由心跳告知JobTracker自己是否存活,同时作为消息通道传递其它信息(请求新task)。作为心跳的一部分,tasktracker会指明自己是否已准备好运行新的任务,如果是,jobtracker会分配它一个任务。


    分配任务所属于的作业:在Jobtracker分配任务前需先确定任务所在的作业。后面会介绍到各种作业调度算法,默认是一个FIFO的作业调度。


    分配Map和Reduce任务:tasktracker有固定数量的任务槽,一个tasktracker可以同时运行多个Map和Reduce任务,但其准确的数量由tasktracker的核的数量和内存大小决定。默认调度器会先填满Map任务槽,再填Reduce任务槽。jobtracker会选择距离离分片文件最近的tasktracker,最理想情况下,任务是数据本地化(data-local)的,当然也可以是机架本地化(rack-local),如果不是本地化的,那么他们就需要从其他机架上检索数据。Reduce任务分配很简单,jobtracker会简单的从待运行的reduce任务列表中选取下一个来执行,不用考虑数据本地化。


任务的执行TaskTracker:

     TaskTracker收到新任务后,就要在本地运行任务了,运行任务的第一步就是通过localizedJob将任务本地化所需要的注入配置、数据、程序等信息进行本地化。


     1.本地化数据:从共享文件系统将job.split 、job.jar (在分布式缓存中)复制本地,将job配置信息写入job.xml。

     2.新建本地工作目录:tasktracker会加压job.jar文件到本工作目录。

     3.调用launchTaskForJob方法发布任务(其中会新建TaskRunner实例运行任务),如果是Map任务就启用MapTaskRunner,对于Reduce就是ReduceTaskRunner。

   

  在这之后,TaskRunner会启用一个新的JVM来运行每个Map/Reduce任务,防止程序原因而导致tasktracker崩溃,但不同任务间重用JVM还是可以的,后续会讲到任务JVM重用。


     对于单个Map,任务执行的简单流程是:

     1.分配任务执行参数

     2.在Child临时文件中添加map任务信息(Child是运行Map和Reduce任务的主进程)

     3.配置log文件夹,配置map任务的通信和输出参数

     4.读取input split,生成RecordReader读取数据

     5.为Map生成MapRunnable,依次从RecordReader中接收数据,并调用Map函数进行处理。

     6.最后将map函数的输出调用collect收集到MapOutputBuffer(参数控制其大小)中。


Streaming和Pipes:

     Streaming和Pipes都运行特殊的Map和Reduce任务,目的是运行用户提供的可执行程序并与之通信。


     Streaming:使用标准输入输出Streaming与进程进行通信。


     Pipes:用来监听套接字,会发送一个端口号给C++程序,两者便可建立链接。

     

进度和状态更新:

     一个作业和它的任务都有状态(status),其中包括:运行成功失败状态、Map/Reduce进度、作业计数器值、状态消息。


     状态消息与客户端的通信:

     1.对于Map任务Progress的追踪:progress是已经处理完的输入所占的比例。

     2.对于Reduce:稍复杂,reduce任务分三个阶段(每个阶段占1/3),复制、排序和Reduce处理,若reduce已执行一半的输入的话,那么任务进度便是1/3+1/3+1/6=5/6。

     3.任务计数器:任务有一组计数器,负责对任务运行各个事件进行计数。

     4.任务进度报告:如果任务报告了进度,便会设置一个标记以表明状态将被发送到tasktracker。有一个独立线程每隔三秒检查一次此标记,如果已设置,则告知tasktracker当前状态。

     5.tasktracker进度报告:tasktracker会每隔5秒(这个心跳是由集群大小决定,集群越大时间会越长)发送heartbeat到jobtracker,并且tasktracker运行的所有状态都会在调用中被发送到jobtracker。

     6.jobtracker合并各任务报告:产生一个表明所有运行作业机器所含任务状态的全局视图。

     前面提到的JobClient就是通过每秒查询JobTracker来接收最新状态,而且客户端JobClient的getJob方法可以得到一个RunningJob的实例,其包含了作业的所以状态信息。

     

作业的完成:

     当jobtracker收到作业最后一个任务已完成的通知后,便把作业状态设置成成功。JobClient查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob方法返回。


     如果jobtracker有相应设置,也会发送一个Http作业通知给客户端,希望收到回调指令的客户端可以通过job.end.notification.url属性来进行设置。


     jobtracker情况作业的工作状态,指示tasktracker也清空作业的工作状态,如删除中间输出。

     

失败

     实际情况下,用户的代码存在软件错误进程会崩溃,机器也会产生故障,但Hadoop能很好的应对这些故障并完成作业。


     1.任务失败    

     子任务异常:如Map/Reduce任务中的用户代码抛出异常,子任务JVM进程会在退出前向父进程tasktracker发送错误报告,错误被记录用户日志。tasktracker会将此次task attempt标记为tailed,并释放这个任务槽运行另外一个任务。


     子进程JVM突然退出:可能由于JVM bug导致用户代码造成的某些特殊原因导致JVM退出,这种情况下,tasktracker会注意到进程已经退出,并将此次尝试标记为failed。


     任务挂起:一旦tasktracker注意一段时间没有收到进度更新,便会将任务标记为failed,JVM子进程将被自动杀死。任务失败间隔时间通常为10分钟,可以以作业或者集群为基础设置过期时间,参数为mapred.task.timeout。注意:如果参数值设置为0,则挂起的任务永远不会释放掉它的任务槽,随着时间的推移会降低整个集群的效率。


     任务失败尝试次数:jobtracker得知一个tasktracker失败后,它会重新调度该任务执行,当然,jobtracker会尝试避免重新调度失败过的tasktracker任务。如果一个任务尝试次数超过4次,它将不再被重试。这个值是可以设置的,对于Map任务,参数是mapred.map.max.attempts,对于reduce任务,则由mapred.reduce.max.attempts属性控制。如果次数超过限制,整个作业都会失败。当然,有时我们不希望少数几个任务失败就终止运行的整个作业,因为即使有些任务失败,作业的一些结果可能还是有用的,这种情况下,可以为作业设置在不触发作业失败情况下的允许任务失败的最大百分比,Map任务和Reduce任务可以独立控制,参数为mapred.max.map.failures.percent 和mapred.max.reduce.failures.percent。


     任务尝试中止(kill):任务终止和任务失败不同,task attempt可以中止是因为他是一个推测副本或因为它所处的tasktracker失败,导致jobtracker将它上面的所有task attempt标记为killed。被终止的task attempt不会被计入任务运行尝试次数,因为尝试中止并不是任务的错。


     2.tasktracker失败

     tasktracker由于崩溃或者运行过慢而失败,他将停止向jobtracker发送心跳(或很少发送心跳)。jobtracker注意已停止发送心跳的tasktracker(过期时间由参数mapred.tasktracker.expiry.interval设置,单位毫秒),并将它从等待调度的tasktracker池中移除。如果是未完成的作业,jobtracker会安排次tasktracker上已经运行成功的Map任务重新运行,因为此时reduce任务已无法访问(中间输出存放在失败的tasktracker的本地文件系统上)。


     即使tasktracker没有失败,也有可能被jobtracker列入黑名单。如果tasktracker上面的失败任务数量远远高于集群的平均失败任务次数,他就会被列入黑名单,被列入黑名单的tasktracker可以通过重启从jobtracker黑名单中移除。


     3.jobtracker失败

     老版本的JobTracker失败属于单点故障,这种情况下作业注定失败。


作业调度:

     早期作业调度FIFO:按作业提交顺序先进先出。可以设置优先级,通过设置mapred.job.priority属性或者JobClient的setJobPriority()方法制定优先级(优先级别:VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW)。注意FIFO调度算法不支持抢占(preemption),所以高优先级作业仍然会被那些已经开始的长时间运行的低优先级作业所阻塞。


     Fair Scheduler:目标是让每个用户公平地共享集群能力。当集群存在很多作业时,空闲的任务槽会以”让每个用户共享集群“的方式进行分配。默认每个用户都有自己的作业池。FairScheduler支持抢占,所以,如果一个池在特定的一段时间未得到公平地资源共享,它会终止池中得到过多的资源任务,以便把任务槽让给资源不足的池。FairScheduler是一个后续模块,使用它需要将其jar文件放在Hadoop的类路径下。可以通过参数map.red.jobtracker.taskScheduler属性配置(值为org.apache.hadoop.mapred.FairScheduler)


     Capacity Scheduler:

     集群由很多队列组成,每个队列都有一个分配能力,这一点与FairScheduler类似,只不过在每个队列内部,作业根据FIFO方式进行调度。本质上说,Capacity Scheduler允许用户或组织为每个用户模拟一个独立使用FIFO的集群。


shuffle和排序:

     MapReduce确保每个Reducer的输入都是按键排序的。系统执行排序的过程-将map输出作为输入传给reducer的过程称为shuffle。shuffle属于不断被优化和改进的代码库的一部分,从许多方面来看,shuffle是MapReduce的心脏。


     整个shuffle的流程应该是这样:

     map结果划分partition  排序sort 分割spill   合并同一划分   合并同一划分  合并结果排序 reduce处理 输出


     Map端:

     写入缓冲区:Map函数的输出,是由collector处理的,它并不是简单的将结果写到磁盘。它利用缓冲的方式写到内存,并处于效率的考虑进行预排序。每个map都有一个环形的内存缓冲区,用于任务输出,默认缓冲区大小为100MB(由参数io.sort.mb调整),一旦缓冲区内容达到阈值(默认0.8),后台进程边开始把内容写到磁盘(spill),在写磁盘过程中,map输出继续被写到缓冲区,但如果缓冲区被填满,map会阻塞知道写磁盘过程完成。写磁盘将按照轮询方式写到mapred.local.dir属性制定的作业特定子目录中。


     写出缓冲区:collect将缓冲区的内容写出时,会调用sortAndSpill函数,这个函数作用主要是创建spill文件,按照key值对数据进行排序,按照划分将数据写入文件,如果配置了combiner类,会先调用combineAndSpill函数再写文件。sortAndSpill每被调用一次,就会写一个spill文件。


     合并所有Map的spill文件:TaskTracker会在每个map任务结束后对所有map产生的spill文件进行merge,merge规则是根据分区将各个spill文件中数据同一分区中的数据合并在一起,并写入到一个已分区且排序的map输出文件中。待唯一的已分区且已排序的map输出文件写入最后一条记录后,map端的shuffle阶段就结束了。


     在写磁盘前,线程首先根据数据最终要传递到的reducer把数据划分成响应的分区(partition),在每个分区中,后台线程按键进行内排序,如果有一个combiner,它会在排序后的输出上运行。


     内存达到溢出写的阈值时,就会新建一个溢出写文件,因为map任务完成其最后一个输出记录之后,会有几个溢出写文件。在任务完成前,溢出写文件会被合并成一个已分区且已排序的输出文件。配置属性io.sort.facor控制一次最多能合并多少流,默认值是10。


     如果已经指定combiner,并且写次数至少为3(通过min.mum.spills.for.combine设置)时,则combiner就会在输出文件写到磁盘之前运行。运行combiner的意义在于使map输出更紧凑,舍得写到本地磁盘和传给reducer的数据更少。


     写磁盘时压缩:写磁盘时压缩会让写的速度更快,节约磁盘空间,并且减少传给reducer的数据量。默认情况下,输出是不压缩的,但可以通过设置mapred.compress.map.output值为true,就可以启用压缩。使用的压缩库是由mapred.map.output.compression.codec制定。


     reducer获得文件分区的工作线程:reducer通过http方式得到输出文件的分区,用于文件分区的工作线程数量由tracker.http.threads属性指定,此设置针对的是每个tasktracker,而不是每个map任务槽。默认值为40,在大型集群上此值可以根据需要而增加。


     Reduce端:

     复制阶段:reduce会定期向JobTracker获取map的输出位置,一旦拿到输出位置,reduce就会从对应的TaskTracker上复制map输出到本地(如果map输出很小,则会被复制到TaskTracker节点的内存中,否则会被让如磁盘),而不会等到所有map任务结束(当然这个也有参数控制)。


     合并阶段:从各个TaskTracker上复制的map输出文件(无论在磁盘还是内存)进行整合,并维持数据原来的顺序。


     Reduce阶段:从合并的文件中顺序拿出一条数据进行reduce函数处理,然后将结果输出到本地HDFS。


     Map的输出文件位于运行map任务的tasktracker的本地磁盘,现在,tasktracker要为分区文件运行reduce任务。每个任务完成时间可能不同,但是只要有一个任务完成,reduce任务就开始复制其输出,这就是reduce任务的复制阶段(copy phase)。reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,可以通过mapred.reduce.parallel.copies属性设置。


     Reducer如何得知从哪个tasktracker获得map输出:map任务完成后会通知其父tasktracker状态已更新,tasktracker进而通知(通过heart beat)jobtracker。因此,JobTracker就知道map输出和tasktracker之间的映射关系,reducer中的一个线程定期询问jobtracker以便获知map输出位置。由于reducer有可能失败,因此tasktracker并没有在第一个reducer检索到map输出时就立即从磁盘上删除它们,相反他会等待jobtracker告示它可以删除map输出时才删除,这是作业完成后最后执行的。


     如果map输出很小,则会被直接复制到reduce tasktracker的内存缓冲区(大小由mapred.job.shuffle.input.buffer.percent控制,占堆空间的百分比),否则,map输出被复制到磁盘。一旦内存缓冲区达到阈值大小(由mapred.iob.shuffle.merge.percent)

或达到map输出阈值大小(mapred.inmem.threadhold),则合并后溢出写到磁盘中。


     随着磁盘上副本增多,后台线程会将他们合并为更大的、排好序的文件。注意:为了合并,压缩的map输出必须在内存中被解压缩。


     排序阶段:复制阶段完成后,reduce任务会进入排序阶段,更确切的说是合并阶段,这个阶段将合并map输出,维持其顺序排列。合并是循环进行的,由合并因子决定每次合并的输出文件数量。但让有可能会产生中间文件。


     reduce阶段:在最后reduce阶段,会直接把排序好的文件输入reduce函数,不会对中间文件进行再合并,最后的合并即可来自内存,也可来自磁盘。此阶段的输出会直接写到文件系统,一般为hdfs。


     细节:这里合并是并非平均合并,比如有40个文件,合并因子为10,我们并不是每趟合并10个,合并四趟。而是第一趟合并4个,后三趟合并10,在最后一趟中4个已合并的文件和余下6个未合并会直接并入reduce。

Hadoop新MapReduce框架Yarn详解

简介

本文介绍了Hadoop自0.23.0版本后新的MapReduce框架(Yarn)原理,优势,运行机制和配置方法等,着重介绍新的yarn框架相对于原框架的差异及改进,并通过Demo示例详细介绍了在新的Yarn框架下搭建和开发Hadoop程序的方法。读者通过本文中新旧Hadoop MapReduce框架的对比,更深刻理解新的yarn框架技术与那里和设计思想,文中的Demo代码经过微小修改既可用于用户基于Hadoop新框架的实际生产环境。

 

Hadoop MapReduceV2(Yarn)框架简介

原Hadoop MapReduce框架的问题

对于业界的大数据存储及分布式处理系统来说,Hadoop是耳熟能详的卓越开源分布式文件系统及处理框架,对于Hadoop框架的介绍在此就不再赘述。使用和学习过老Hadoop框架(0.20.0及之前版本)的同仁应该很熟悉如下的MapReduce框架图:

技术分享

从上图中可以清楚的看出原MapReduce程序的流程及设计思路:

1.首先用户程序(JobClient)提交了一个Job,Job的信息会发送到JobTracker中,JobTracker是MapReduce框架的中心,它需要与集群中的机器定时通信(通过心跳机制),需要管理哪些程序应该跑在哪些机器上,需要管理所有Job失败、重启等操作。

2.TaskTracker是MapReduce集群中每台机器都有的一个部分,它做的事情主要是监视自己所在机器的资源情况。

3.TaskTracker同时监视当前机器的tasks运行状况。TaskTracker需要把这些信息通过Heartbeat发送给JobTracker,JobTracker会搜集这些信息以给新提交的Job分配运行在哪些机器上。上图虚线箭头就是表示消息的发送-接收过程。

可以看出原来的MapReduce框架是简单明了的,在最初推出的几年,也得到了众多的成功案例,获得业界广泛的支持和肯定,但随着分布式系统集群的规模和其工作负荷的增长,原框架的问题逐渐浮出水面,主要的问题集中如下:

1.JobTracker是MapReduce的集中处理点,存在单点故障。

2.JobTracker完成了太多的任务,造成了过多的资源消耗,当MapReduce Job非常多的时候,会造成很大的内存开销,潜在来说,也增加了JobTracker fail的风险,业界普遍总结出老Hadoop的MapReduce只能支持4000节点主机的上线。

3.在TaskTracker端,以MapReduce task的数目作为资源的表示过于简单,没有考虑到CPU内存的占用情况,如果两个大内存消耗的task被调度到了一块,很容易出现OOM。

4.在TaskTracker端,把资源强制划分为Map task slot和reduce task slot,如果当系统中只有map task或者只有reduce task的时候,会造成资源的浪费,也就是前面提到过的集群资源利用的问题。

5.源代码层面分析的时候,会发现代码非常的难读,常常因为一个class做了太多的事情,代码量达到了3000多行,造成class的任务不清晰,增加bug修复和版本维护的难度。

6.从操作的角度来看,现在Hadoop MapReduce框架在任何重要或者不重要的变化(例如bug修复,性能提升和特性化)时,都会强制进行系统级别的升级更新。更糟的是,它不管用户的喜好,强制让分布式集群系统的每一个用户端同时更新。这些更新会让用户为了验证他们之前的应用程序是不是使用新的Hadoop版本而浪费大量时间。

 

新Hadoop Yarn框架原理及运行机制

从业界使用分布式系统的变化趋势和Hadoop框架长远发展来看,MapReduce的JobTracker和TaskTracker机制需要大规模的调整来修复它在可扩展性,内存消耗,线程模型,可靠性和性能上的缺陷。在过去的几年中,Hadoop开发团队做了一些bug的修复,但是最近这些修复的成本越来越高,这表明对原来框架做出改变的难度越来越大。

为从根本上解决旧MapReduce框架的性能瓶颈,促进Hadoop框架更长远发展,从0.23.0版本开始,Hadoop的MapReduce框架完全重构,发生了根本的变化。新的Hadoop MapReduce框架命名为MapReduceV2或者叫Yarn,其架构图如下所示:

技术分享

重构的根本思想是将JobTracker两个主要的功能分离成单独的组件,这两个功能是资源管理和任务调度/监控。新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的ApplicationMaster负责调度和协调。一个应用程序无非是一个单独的传统的MapReduce任务或者是一个DAG(有向无环图)任务。ResourceManager和每一台机器的节点管理服务器能够管理用户在那台机器上的进程并能对计算进行组织。

事实上,每一个应用的ApplicationMaster是一个详细的框架库,它结合从ResourceManager获得的资源和NodeManager协同工作来运行监控任务。

上图中ResourceManager支持分层级的应用队列,这些队列享有集群一定比例的资源。从某种意义上讲他就是一个纯粹的调度器,它在执行过程中不对应用进行监控和状态跟踪。同样,它也不能重启因应用失败或者硬件错误而运行失败的任务。

ResourceManager是基于应用程序对资源的需求进行调度的,每一个应用程序需要不同类型的资源因此就需要不同的容器PU,。资源包括:内存,C磁盘,网络等等。可以看出,这同现MapReduce固定类型的资源使用模型有显著的区别,它给集群的使用带来了负面的影响。资源管理器提供一个调度策略的插件,它负责将集群资源分配给多个队列和应用程序。调度插件可以基于现有的能力调度和公平调度模型。

上图中NodeManager是每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源使用情况(CPU,内存,硬盘,网络)并且向调度器会报。

每一个应用的ApplicationMaster的职责有:想调度器所要适当的资源容器,运行任务,跟踪应用程序的状态和监控他们的进程,处理任务的失败原因。

新旧Hadoop MapReduce框架对比

让我们来对新旧MapReduce框架做详细的分析和对比,可以看到有一下几点显著变化:

首先客户端不变,其调用API及接口大部分兼容,这也是为了对开发使用者透明化,使其不必对原有代码做大的改变,但是原框架中核心的JobTracker和TaskTracker不见了,取而代之的是ResourceManager,ApplicationMaster和NodeManager三个部分。

我们来详细解释这三个部分,首先ResourceManager是一个中心的服务,它做的事情是调度、启动每一个Job所属的ApplicationMaster、另外监控ApplicationMaster的存在情况。细心的读者会发现:Job里面所在的task的监控、重启等等内容不见了。这就是AppMst存在的原因。ResourceManager负责作业与资源的调度。接收JobSubmitter提交的作业,按照作业的上下文(Context)信息,以及从NodeManager收集来的状态信息,启动调度过程,分配一个Container作为AppMstr

NodeManager功能比较专一,就是负责Container状态的维护,并向RM保持心跳。

ApplicationMaster负责一个Job声明周期的所有工作,类似老的框架中JobTracker。但注意每一个Job(不是每一种)都有一个ApplicationMaster,它可以运行在ResourceManager以外的机器上。

Yarn框架相对于老的MapReduce框架有什么优势呢?我们可以看到:

1.这个设计大大减小了JobTracker(也就是现在的ResourceManager)的资源消耗,并且让监测每一个Job子任务(task)状态的程序分布式化了,更安全、更优美。

2.在新的Yarn中,ApplicationMaster是一个可变更的部分,用户可以对不同的编程模型写自己的AppMst,让更多类型的编程模型能够跑在Hadoop集群中。

3.对于资源的表示以内存为单位,比之前以剩余slot数目更合理。

4.老的框架中,JobTracker一个很大的负担就是监控Job下的task的运行状况,现在,这个部分就扔个ApplicationMaster做了,而ResourceManager中有一个模块叫做ApplicationMasters(注意不是ApplicationMaster),它是监测ApplicationMaster的运行状况,如果出问题,会将其在其他机器上重启。

5.Container是Yarn为了将来做资源隔离而提出的一个框架。这一点应该借鉴了Mesos的工作,目前是一个框架,仅仅提供java虚拟机内存的隔离,Hadoop团队的设计思路应该后续能支持更多的资源调度和控制,既然资源表示内存量,那就没有了之前的Map slot和reduce slot分开造成集群资源闲置的尴尬情况。

新的Yarn框架相对于旧MapReduce框架而言,其配置文件,启停脚本及全局变量等也发生了一些变化,主要的的改变如下:

 

 

改变项原框架中新框架中(Yarn)备注
配置文件位置 ${hadoop_home_dir}/conf ${hadoop_home_dir}/etc/hadoop/ Yarn 框架也兼容老的 ${hadoop_home_dir}/conf 位置配置,启动时会检测是否存在老的 conf 目录,如果存在将加载 conf 目录下的配置,否则加载 etc 下配置
启停脚本 ${hadoop_home_dir}/bin/start(stop)-all.sh ${hadoop_home_dir}/sbin/start(stop)-dfs.sh
${hadoop_home_dir}/bin/start(stop)-all.sh
新的 Yarn 框架中启动分布式文件系统和启动 Yarn 分离,启动 / 停止分布式文件系统的命令位于 ${hadoop_home_dir}/sbin 目录下,启动 / 停止 Yarn 框架位于 ${hadoop_home_dir}/bin/ 目录下
JAVA_HOME 全局变量 ${hadoop_home_dir}/bin/start-all.sh 中 ${hadoop_home_dir}/etc/hadoop/hadoop-env.sh
${hadoop_home_dir}/etc/hadoop/Yarn-env.sh
Yarn 框架中由于启动 hdfs 分布式文件系统和启动 MapReduce 框架分离,JAVA_HOME 需要在 hadoop-env.sh 和 Yarn-env.sh 中分别配置
HADOOP_LOG_DIR 全局变量 不需要配置 ${hadoop_home_dir}/etc/hadoop/hadoop-env.sh 老框架在 LOG,conf,tmp 目录等均默认为脚本启动的当前目录下的 log,conf,tmp 子目录 
Yarn 新框架中 Log 默认创建在 Hadoop 用户的 home 目录下的 log 子目录,因此最好在 ${hadoop_home_dir}/etc/hadoop/hadoop-env.sh 配置 HADOOP_LOG_DIR,否则有可能会因为你启动 hadoop 的用户的 .bashrc 或者 .bash_profile 中指定了其他的 PATH 变量而造成日志位置混乱,而该位置没有访问权限的话启动过程中会报错

 

 

由于新的Yarn框架和原Hadoop MapReduce框架相比变化比较大,核心的配置文件中很多项在新框架中已经废弃,详细请参考:新老配置属性的对比

 

Hadoop Yarn框架Demo示例

Demo场景介绍:Weblogic应用服务器日志分析

了解了Hadoop新的Yarn框架的架构和思路后,我们用一个Demo示例来检验新Yarn框架下MapReduce程序的开发部署。

我们考虑如下应用场景:用户在生产系统由多台Weblogic应用服务器组成,每天需要对每台应用服务器的日志内容进行检查,统计日志级别和日志模块的总数。

WebLogic的日志范例如下图所示:

技术分享

如上图所,<Info>为weblogic的日志级别,<Security>,<Management>为Weblogic的日志模块,我们主要分析loglevel和logmodule这两个温度分别在Weblogic日志中出现的次数,每天需要统计出loglevel和logmodule分别出现的次数总数。

Demo 测试环境 Yarn 框架搭建
由于 Weblogic 应用服务器分布于不同的主机,且日志数据量巨大,我们采用 hadoop 框架将 WebLogic 各个应用服务器主机上建立分布式目录,每天将 WebLogic 日志装载进 hadoop 分布式文件系统,并且编写基于 Yarn 框架的 MapReduce 程序对日志进行处理,分别统计出 LogLevel 和 Logmodule 在日志中出现的次数并计算总量,然后输出到分布式文件系统中,输出目录命名精确到小时为后缀以便区分每次 Demo 程序运行的处理结果。
我们搭建一个 Demo 测试环境以验证 Yarn 框架下分布式程序处理该案例的功能,以两台虚拟机作为该 Demo 的运行平台,两机均为 Linux 操作系统,机器 hostname 为 OEL 和 Stephen,OEL 作为 NameNode 和 ResouceManager 节点主机,64 位,Stephen 作为 DataNode 和 NodeManager 节点主机,32 位(Hadoop 支持异构性), 具体如下:

Demo测试环境表

 

主机名角色备注
OEL(192.168.137.8) NameNode 节点主机 
ResourceManager 主机
linux 操作系统 
32bit
Stephen(192.168.l37.2) DataNode 节点主机 
NodeManager 主机
linux 操作系统 
64bit

 

 

我们把 hadoop 安装在两台测试机的 /hadoop 文件系统目录下,安装后的 hadoop 根目录为:/hadoop/hadoop-0.23.0,规划分布式文件系统存放于 /hadoop/dfs 的本地目录,对应分布式系统中的目录为 /user/oracle/dfs
我们根据 Yarn 框架要求,分别在 core-site.xml 中配置分布式文件系统的 URL,详细如下:

core-site.xml配置:

 

[html] view plain copy
 
  1.               
  2. <configuration>   
  3.   <property>   
  4.   <name>fs.defaultFS</name>   
  5.   <value>hdfs://192.168.137.8:9100</value>   
  6.   </property>   
  7. </configuration>   


在hdfs-site.xml中配置NameNode,DataNode的本地目录信息,详细如下:

 

hdfs-site.xml配置:

 

[html] view plain copy
 
  1. <configuration>   
  2.  <property>   
  3.   <name>dfs.namenode.name.dir</name>   
  4.   <value>/hadoop/dfs/name</value>   
  5.   <description>  </description>   
  6.  </property>   
  7.   
  8.  <property>   
  9.   <name>dfs.datanode.data.dir</name>   
  10.   <value>/hadoop/dfs/data</value>   
  11.   <description</description>   
  12.  </property>   
  13.   
  14.  <property>   
  15.    <name>dfs.replication</name>   
  16.    <value>2</value>   
  17.  </property>   
  18.   
  19.  </configuration>   


在mapred-site.xml中配置其使用yarn框架处理程序,详细如下:

 

mapred-site.xml配置:

 

[html] view plain copy
 
  1.               
  2. <configuration>   
  3.  <property>   
  4.  <name>mapreduce.framework.name</name>   
  5.  <value>Yarn</value>   
  6.  </property>   
  7. </configuration>   


最后在Yarn-site.xml中配置ResourceManager,NodeManager的通信端口,Web监控端口等,详细如下:

 

yarn-site.xml配置:

 

[html] view plain copy
 
  1.               
  2. <?xml version="1.0"?>   
  3. <configuration>   
  4.   
  5. <!-- Site specific YARN configuration properties -->   
  6.  <property>   
  7.  <name>Yarn.nodemanager.aux-services</name>   
  8.  <value>mapreduce.shuffle</value>   
  9.  </property>   
  10.  <property>   
  11.  <description>The address of the applications manager interface in the RM.</description>   
  12.  <name>Yarn.resourcemanager.address</name>   
  13.  <value>192.168.137.8:18040</value>   
  14.  </property>   
  15.   
  16.  <property>   
  17.  <description>The address of the scheduler interface.</description>   
  18.  <name>Yarn.resourcemanager.scheduler.address</name>   
  19.  <value>192.168.137.8:18030</value>   
  20.  </property>   
  21.   
  22.  <property>   
  23.  <description>The address of the RM web application.</description>   
  24.  <name>Yarn.resourcemanager.webapp.address</name>   
  25.  <value>192.168.137.8:18088</value>   
  26.  </property>   
  27.    
  28.  <property>   
  29.  <description>The address of the resource tracker interface.</description>   
  30.  <name>Yarn.resourcemanager.resource-tracker.address</name>   
  31.  <value>192.168.137.8:8025</value>   
  32.  </property>   
  33. </configuration>   


Demo代码开发及详解

 

以下我们详细介绍一下新的yarn框架下针对该应用场景的Demo代码的开发,在Demo程序的每个类都有详细的注释说明,yarn开发为了兼容老版本的,API变化不大。

在Map程序中,我们以行号为key,行文本内容为value读取每一行Weblogic日志输入,将loglevel和logmodule的值读出作为Map处理后新的Key值,由于一行中loglevel和logmodule的出现次数应该唯一,所以经Map程序处理后的新的Record记录的value应该都为1。

Map业务逻辑

 

[html] view plain copy
 
  1. public static class MapClass extends Mapper<Object, Text, Text, IntWritable>   
  2.   {   
  3.   private Text record = new Text();   
  4.   private static final IntWritable recbytes = new IntWritable(1);   
  5.   public void map(Object key, Text value,Context context)   
  6.     throws IOException,InterruptedException {    
  7.   String line = value.toString();   
  8.  // 没有配置 RecordReader,所以默认采用 line 的实现,  
  9.  //key 就是行号,value 就是行内容,  
  10.  // 按行 key-value 存放每行 loglevel 和 logmodule 内容  
  11.   if (line == null || line.equals(""))   
  12.     return;   
  13.   String[] words = line.split("<");   
  14.   if (words == null || words.length 2)   
  15.     return;   
  16.   String logLevel = words[1];   
  17.   String moduleName = words[2];   
  18.     
  19.   record.clear();   
  20.   record.set(new StringBuffer("logLevel::").append(logLevel).toString());   
  21.   context.write(record, recbytes);   
  22.   // 输出日志级别统计结果,通过 logLevel:: 作为前缀来标示。  
  23.     
  24.   record.clear();   
  25.    record.set(new StringBuffer("moduleName::").append(moduleName).toString());   
  26.   context.write(record, recbytes);   
  27.    // 输出模块名的统计结果,通过 moduleName:: 作为前缀来标示  
  28.   }    
  29.   }   

由于有loglevel和logmodule两部分的分析工作,我们设定两个Reduce来分别处理这两部分,loglevel交给reduce1,logmodule交给reduce2.因此我们编写Patitioner类,根据Map传过来的Key中包含的loglevel和moduleName的前缀,来分到不同的reduce。

 

Patition业务逻辑:

 

[html] view plain copy
 
  1. public static class PartitionerClass extends Partitioner<Text, IntWritable>  
  2. {  
  3. public int getPartition(Text key, IntWritable value, int numPartitions)  
  4. {  
  5. if (numPartitions >= 2)//Reduce 个数,判断 loglevel 还是 logmodule 的统计,分配到不同的 Reduce  
  6. if (key.toString().startsWith("logLevel::"))  
  7. return 0;  
  8. else if(key.toString().startsWith("moduleName::"))  
  9. return 1;  
  10. else return 0;  
  11. else  
  12. return 0;  
  13. }  
  14.   
  15. }  

 

 

在Reduce程序中,累加并合并loglevel和logmodule的出现次数。

Reduce业务逻辑:

 

[html] view plain copy
 
  1. public static class ReduceClass extends  Reducer<Text, IntWritable,Text, IntWritable>   
  2.         {   
  3.             private IntWritable result = new IntWritable();   
  4.             public void reduce(Text key, Iterable<IntWritable> values,   
  5.                     Context context)throws IOException,   
  6.                                                      InterruptedException {   
  7.                   
  8.                 int tmp = 0;   
  9.                 for (IntWritable val : values) {   
  10.                     tmp = tmp + val.get();                                       
  11.                 }   
  12.                 result.set(tmp);   
  13.                 context.write(key, result);// 输出最后的汇总结果  
  14.             }      
  15.         }   

以上完成了MapReduce的主要处理逻辑,对于程序入口,我们使用Hadoop提供的Tools工具包方便的进行MapReduce程序的启动和MapReduce对应处理class的配置。

 

Main执行类:

 

[html] view plain copy
 
  1.               
  2. import java.io.File;   
  3. import java.io.IOException;   
  4. import java.text.SimpleDateFormat;   
  5. import java.util.Date;   
  6. import java.util.Iterator;   
  7. import org.apache.hadoop.conf.Configuration;   
  8. import org.apache.hadoop.conf.Configured;   
  9. import org.apache.hadoop.fs.Path;   
  10. import org.apache.hadoop.io.IntWritable;   
  11. import org.apache.hadoop.io.Text;   
  12. import org.apache.hadoop.mapreduce.Job;   
  13. import org.apache.hadoop.mapreduce.Reducer;   
  14. import org.apache.hadoop.mapreduce.Mapper;   
  15. import org.apache.hadoop.mapreduce.Partitioner;   
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   
  18. import org.apache.hadoop.util.Tool;   
  19. import org.apache.hadoop.util.ToolRunner;   
  20. public class LogAnalysiser extends Configured implements Tool {   
  21.   public static void main(String[] args)   
  22.  {   
  23.    try   
  24.  {   
  25.  int res;   
  26.  res = ToolRunner.run(new Configuration(),new LogAnalysiser(), args);   
  27.  System.exit(res);   
  28.  } catch (Exception e)   
  29.  {   
  30.  e.printStackTrace();   
  31.  }   
  32.  }   
  33.  public int run(String[] args) throws Exception   
  34.  {   
  35.  if (args == null || args.length <2)   
  36.  {   
  37.  System.out.println("need inputpath and outputpath");   
  38.  return 1;   
  39.  }   
  40.  String inputpath = args[0];   
  41.  String outputpath = args[1];   
  42.  String shortin = args[0];   
  43.  String shortout = args[1];   
  44.  if (shortin.indexOf(File.separator) >= 0)   
  45.  shortin = shortin.substring(shortin.lastIndexOf(File.separator));   
  46.  if (shortout.indexOf(File.separator) >= 0)   
  47.  shortout = shortout.substring(shortout.lastIndexOf(File.separator));   
  48.  SimpleDateFormat formater = new SimpleDateFormat("yyyy.MM.dd.HH.mm");   
  49.  shortout = new StringBuffer(shortout).append("-")   
  50.  .append(formater.format(new Date())).toString();   
  51.    
  52.    
  53.  if (!shortin.startsWith("/"))   
  54.  shortin = "/" + shortin;   
  55.  if (!shortout.startsWith("/"))   
  56.  shortout = "/" + shortout;   
  57.  shortin = "/user/oracle/dfs/" + shortin;   
  58.  shortout = "/user/oracle/dfs/" + shortout;     
  59.  File inputdir = new File(inputpath);   
  60.  File outputdir = new File(outputpath);   
  61.    
  62.  if (!inputdir.exists() || !inputdir.isDirectory())   
  63.  {   
  64.  System.out.println("inputpath not exist or isn‘t dir!");   
  65.  return 0;   
  66.  }   
  67.  if (!outputdir.exists())   
  68.  {   
  69.  new File(outputpath).mkdirs();   
  70.  }   
  71. // 以下注释的是 hadoop 0.20.X 老版本的 Job 代码,在 hadoop0.23.X 新框架中已经大大简化  
  72. //   Configuration conf = getConf();   
  73. //   JobConf job = new JobConf(conf, LogAnalysiser.class);     
  74. //    JobConf conf = new JobConf(getConf(),LogAnalysiser.class);// 构建 Config   
  75. //    conf.setJarByClass(MapClass.class);   
  76. //    conf.setJarByClass(ReduceClass.class);   
  77. //    conf.setJarByClass(PartitionerClass.class);   
  78. //    conf.setJar("hadoopTest.jar");   
  79. //    job.setJar("hadoopTest.jar");   
  80.   
  81. // 以下是新的 hadoop 0.23.X Yarn 的 Job 代码  
  82.   
  83. job job = new Job(new Configuration());   
  84.    job.setJarByClass(LogAnalysiser.class);   
  85.    job.setJobName("analysisjob");   
  86.    job.setOutputKeyClass(Text.class);// 输出的 key 类型,在 OutputFormat 会检查  
  87.    job.setOutputValueClass(IntWritable.class); // 输出的 value 类型,在 OutputFormat 会检查  
  88.    job.setJarByClass(LogAnalysiser.class);   
  89.    job.setMapperClass(MapClass.class);   
  90.    job.setCombinerClass(ReduceClass.class);   
  91.    job.setReducerClass(ReduceClass.class);   
  92.    job.setPartitionerClass(PartitionerClass.class);   
  93.    job.setNumReduceTasks(2);// 强制需要有两个 Reduce 来分别处理流量和次数的统计  
  94.    FileInputFormat.setInputPaths(job, new Path(shortin));//hdfs 中的输入路径  
  95.    FileOutputFormat.setOutputPath(job,new Path(shortout));//hdfs 中输出路径  
  96.      
  97.    Date startTime = new Date();   
  98.    System.out.println("Job started: " + startTime);   
  99.    job.waitForCompletion(true);      
  100.    Date end_time = new Date();   
  101.    System.out.println("Job ended: " + end_time);   
  102.    System.out.println("The job took " +   
  103.    (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");  
  104.    // 删除输入和输出的临时文件  
  105. //    fileSys.copyToLocalFile(new Path(shortout),new Path(outputpath));   
  106. //    fileSys.delete(new Path(shortin),true);   
  107. //    fileSys.delete(new Path(shortout),true);   
  108.    return 0;   
  109.  }   
  110. }   


执行程序!

以上是关于Hadoop,MapReduce,YARN和Spark的区别与联系的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop,MapReduce,YARN和Spark的区别与联系

Hadoop 新 MapReduce 框架 Yarn 详解

大数据Hadoop|MapRedece|Yarn

hadoop之MapReduce---Yarn资源调度器

大数据之Hadoop(MapReduce):Yarn基本架构

一篇长文梳理Hadoop YARN和MapReduce优化