Spark应用开发之一:Hadoop分析大数据
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark应用开发之一:Hadoop分析大数据相关的知识,希望对你有一定的参考价值。
要学会和使用一门技术的时候,首先要弄清楚该技术出现的背景和要解决的问题。要说spark首先要了解海量数据的处理和Hadoop技术。
一个系统在运行的过程中都会产生许多的日志数据,这些日志数据包含但不局限我们平常开发中使用log4j或者logback生成的记录系统运行情况的日志。例如对于网络服务提供商,他们的设备可能会记录着用户上下线时间,访问的网页地址,响应时长等数据,这些数据文件里面记录的某些信息经过抽取分析后可以得出许多的指标信息,从而为改善网络结构和提高服务等提供数据依据。但这些数据会很大,使用一般的技术和方案将难以达到分析的目的,于是一种全新的处理海量数据的计算模型和框架的出现变得迫在眉睫。
处理海量数据要解决的第一个问题便是存储。我们需要将收集来的日志文件存放在某个地方便于后面的数据分析。可是一台机器的容量始终十分的有限,随着数据的增涨我们也不可能无限的扩展一台服务器的存储能力,所以我们需要将收集的数据存放在许多的机器上,并通过某种方案进行统一管理。
处理海量数据要解决的第二个问题便是计算。一台服务器的计算能力是有限的,它会直接受限于CPU和内存。随着数据量的增大,我们也不能无限的扩展他们,所以同数据存储一样,我们也需要利用多台机器的计算能力来一起完成运算的工作。每个计算机都是一个独立的个体,他们之间运行的代码本身是无关联关系的,我们也需要某种方案来协调各个计算机的执行,让其成为逻辑上一台超级超级的计算机。
基于GSF(Google的文件系统)的思想也开发了一个Hadoop使用的分布式文件系统HDFS。HDFS是基于计算机本地文件系统的分布式文件系统,也就是说HDFS将文件直接存放于计算机的本地文件系统之上(当然我们是无法直接查看文件里面的内容的)。
HDFS解决了上面提到的数据的存储问题。一般情况每个计算机上只会有一个管理本地数据的DataNode进程(该计算机称为DataNode节点),DataNode进程与主控节点上的NameNode进程通信(该节点称为NameNode节点),以完成数据块状态的报告和发送心跳信号等。NameNode是一个中心服务器,负责管理维护文件系统的名字空间(namespace)以及客户端对文件的访问。
注:名字空间(Namespace)即文件系统文件目录的组织方式,是文件系统的重要组成部分,为用户提供可视化的、可理解的文件系统视图,从而解决或降低人类与计算机之间在数据存储上的语义间隔。目前树状结构的文件系统组织方式与现实世界的组织结构最为相似,被人们所广泛接受。因此绝大多数的文件系统皆以Tree方式来组织文件目录,包括各种磁盘文件系统(EXTx, XFS, JFS, Reiserfs, ZFS, Btrfs, NTFS, FAT32等)、网络文件系统(NFS,AFS, CIFS/SMB等)、集群文件系统(Lustre, PNFS, PVFS, GPFS, PanFS等)、分布式文件系统(GoogleFS,HDFS, MFS, KFS, TaobaoFS, FastDFS等)。
接着我们说说用于大数据批处理分析的并行计算框Map/Reduce。该框架把数据的处理分为两个独立的Map和Reduce阶段,并分别对应两个方法map和reduce:
/* @key 由于框架需要序列化key和根据key来排序,所有该key类型必须实现WritableComparable接口 @value 这就是具体的某行数据,获取前面个map传递过来的value,由于需要序列化所以需要实现Writable接口 @out 将映射后的键值对数据的接口,调用该接口的collect(WritableComparable,Writable)方法分别传入key和value即可 @reporter应用程序可以使用Reporter报告进度,设定应用级别的状态消息,更新Counters(计数器),或者仅是表明自己运行正常 */ map(WritableComparable key, Writable value, OutputCollector out , Reporter reporter)
/* @key 上个阶段(map)输出的key @values 上个阶段已经排序好的输出(因为不同 mapper 的输出中可能会有相同的 key) */ reduce(WritableComparable key, Iterator values, OutputCollector out, Reporter report) |
在Map阶段,Hadoop框架首先从HDFS上指定路径下获取要处理的文件,然后对该文件进行分片处理(InputSplit),然后每个分片使用一个Map task来处理。Hadoop框架在调用map方法前,会使用InputFormat类型的对象来处理数据的分片操作,针对每个分片(InputSplit)会创建一个RecordReader类型的对象来读取每个分片的内容从而调用map方法来处理数据。
InputFormat类型将文件从逻辑上切分为片,每个片记录了数据的偏移量和大小等信息,但分片操作会把本是一行的数据切分到两个甚至多个片中,这么一来后面处理的数据就是错误的。这是RecordReader需要解决的问题,以LineRecordReader为例,如果某个分片是文件的第一个分片,那么从第一个字节开始读取,否则从分片的第二行开始读取;如果某个分片是文件的最后一个分片,那么读完本分片的数据即可,否则获取下一个分片的第一行数据结束。这么一来,对于以行分割的数据就可以保证每次读取的行都是完整的。
以LineRecordReader为例,LineRecordReader读取分片中的每行数据,然后以键值序列对(key-value)的形式来调用map函数,此时的key为该行数据的偏移量,value为该行数据。
Hadoop框架将每次执行map函数的返回值先放入一个缓冲区,当缓存区的使用量达到指定的阈值后,开使用一个线程来将这部分数据溢出到一个临时文件当中。在溢出前会对这些要溢出的数据先做几个操作:
1, 通过partitioner操作根据key来进行分区,确定某个数据归属于哪个reducer来处理
2, 对数据根据key来排序
3, 根据key对数据进行合并(用户根据需要指定)
以上步骤完成后将数据溢出到一个临时文件。当处理完某个分片后,可能会生成许多个这样的溢出文件,接着需要对溢出文件进行合并生成一个完整的文件(该完整指的是该分片要处理的那部分数据)。在合并的时候也需要对数据进行排序和合并操作,由于文件可能很大,不能一次载入到内存进行排序操作,所以这里用到了外排序。但最终生成的文件里面的数据是经过分区分组,排序后的。
到此Map阶段结束,接着要进入的是Reduce阶段。在真正调用reduce方法之前,有一个shuffle阶段需要预处理。在每个map task结束后,Reduce task都会得到通知,并将自己要处理的数据的位置信息保存到mapLocations中,然后对数据经过过滤去重后保存在scheduledCopies中,接着由几个线程并行的拷贝数据,并进行排序合并操作。
最后就是通过调用reduce方法来处理合并的数据,并将结果输出到HDFS即可。
本文出自 “因简单而自在” 博客,请务必保留此出处http://dengshuangfu.blog.51cto.com/8794650/1917779
以上是关于Spark应用开发之一:Hadoop分析大数据的主要内容,如果未能解决你的问题,请参考以下文章
金融需要 hadoop,spark 等这些大数据分析工具吗?使用场景是怎样的
Spark on Kubernetes 与 Spark on Yarn 不完全对比分析