Hadoop3 - MapReduce 介绍于基本使用

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop3 - MapReduce 介绍于基本使用相关的知识,希望对你有一定的参考价值。

一、MapReduce

Hadoop MapReduce是一个软件框架,用于轻松编写应用程序,这些应用程序以可靠,容错的方式并行处理大型硬件集群(数千个节点)上的大量数据(多TB数据集。它是一种面向海量数据处理的一种指导思想,也是一种用于对大规模数据进行分布式计算的编程模型。

MapReduce最早由Google2004年在一篇名为《MapReduce:Simplified Data Processingon Large Clusters》的论文中提出,把分布式数据处理的过程拆分为MapReduce两个操作函数(受到Lisp以及其他函数式编程语言的启发),随后被Apache Hadoop参考并作为开源版本提供支持。它的出现解决了人们在最初面临海量数据束手无策的问题,同时,它还是易于使用和高度可扩展的,使得开发者无需关系分布式系统底层的复杂性即可很容易的编写分布式数据处理程序,并在成千上万台普通的商用服务器中运行。

MapReduce思想在生活中处处可见,思想核心是先分再合,分而治之, 所谓 分而治之 就是把一个复杂的问题,按照一定的 分解 方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,把各部分的结果组成整个问题的结果。

Map负责 ,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。

Reduce负责 ,即对map阶段的结果进行全局汇总。

MapReduce的特点:

  1. 易于编程
    Mapreduce框架提供了用于二次开发得接口;简单地实现一些接口,就可以完成一个分布式程序。任务计算交给计算框架去处理,将分布式程序部署到hadoop集群上运行,集群节点可以扩展到成百上千个等。

  2. 良好的扩展性
    当计算机资源不能得到满足的时候,可以通过增加机器来扩展它的计算能力。基于MapReduce的分布式计算得特点可以随节点数目增长保持近似于线性的增长,这个特点是MapReduce处理海量数据的关键,通过将计算节点增至几百或者几千可以很容易地处理数百TB甚至PB级别的离线数据。

  3. 高容错性
    Hadoop集群是分布式搭建和部署得,任何单一机器节点宕机了,它可以把上面的计算任务转移到另一个节点上运行,不影响整个作业任务得完成,过程完全是由Hadoop内部完成的。

  4. 适合海量数据的离线处理
    可以处理GBTBPB级别得数据量。

局限性:

MapReduce虽然有很多的优势,也有相对得局限性,不代表不能做,而是在有些场景下实现的效果比较差,并不适合用MapReduce来处理:

  1. 实时计算性能差
    MapReduce主要应用于离线作业,无法作到秒级或者是亚秒级得数据响应。

  2. 不能进行流式计算
    流式计算特点是数据是源源不断得计算,并且数据是动态的;而MapReduce作为一个离线计算框架,主要是针对静态数据集得,数据是不能动态变化得。

MapReduce架构体系:

一个完整的mapreduce程序在分布式运行时有三类实例进程:

  1. MRAppMaster:负责整个程序的过程调度及状态协调
  2. MapTask:负责map阶段的整个数据处理流程
  3. ReduceTask:负责reduce阶段的整个数据处理流程

二、MapReduce 编程

MapReduce分布式的运算程序需要分成2个阶段,分别是Map阶段和Reduce阶段。Map阶段对应的是MapTask并发实例,完全并行运行。Reduce阶段对应的是ReduceTask并发实例,数据依赖于上一个阶段所有MapTask并发实例的数据输出结果。

用户编写的程序分成三个部分:MapperReducerDriver(提交运行mr程序的客户端驱动),自定义的MapperReducer都要继承各自的父类。Mapper中的业务逻辑写在map()方法中,Reducer的业务逻辑写在reduce()方法中。整个程序需要一个Driver来进行提交,提交的是一个描述了各种必要信息的job对象。

最需要注意的是:整个MapReduce程序中,数据都是以kv键值对的形式流转的。因此在实际编程解决各种业务问题中,需要考虑每个阶段的输入输出kv分别是什么。并且在MapReduce中数据会因为某些默认的机制进行排序进行分组。所以说kv的类型数据确定及其重要。

Hadoop的序列化:

Hadoop的序列化没有采用java的序列化机制,而是实现了自己的序列化机制。原因在于java的序列化机制比较臃肿,重量级,是不断的创建对象的机制,并且会额外附带很多信息(校验、继承关系系统等)。但在Hadoop的序列化机制中,用户可以复用对象,这样就减少了java对象的分配和回收,提高了应用效率。

Hadoop的序列化通过实现Writable接口进行序列化,不过没有提供比较功能,可以和java中的Comparable接口合并,并且Hadoop中也提供一个接口 WritableComparable 实现该功能。

由于 Hadoop 使用了自己的序列化机制,因此也提供了常用的数据类型,都实现了 WritableComparable 接口,以便序列化进行网络传输和文件存储,类型如下:

Hadoop 数据类型对照Java数据类型
BooleanWritableboolean
ByteWritablebyte
IntWritableint
FloatWritablefloat
LongWritablelong
DoubleWritabledouble
TextString
MapWritablemap
ArrayWritablearray
NullWritablenull

注意:如果需要将自定义的类放在key中传输,则需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。

三、实施操作 (WordCount)

WordCount 单词统计、词频统计,指的是使用程序统计某文本文件中,每个单词出现的总次数。这个是大数据计算领域经典的入门案例,首先准备一个 txt 文件,文件内容如下所示:

hello map reduce abc
apple spark map
reduce abc hello
spark map

经过计算可以得到类似下面结果:

abc	2
apple	1
hello	2
map	3
reduce	2
spark	2

下面创建一个 Maven 项目,在 pom 中引入依赖:

<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-common</artifactId>
   <version>3.1.4</version>
</dependency>
<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-hdfs</artifactId>
   <version>3.1.4</version>
</dependency>
<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
   <version>3.1.4</version>
</dependency>
<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-core</artifactId>
   <version>3.1.4</version>
</dependency>

如果是在 windows 环境下运行程序,需要确保电脑上有 Hadoop 环境。

下面编写 Mapper 继承自 HadoopMapper

/**
 * 其中 Mapper 的四个泛型表示
 * KEYIN:    表示map阶段输入kv中的k类型
 * VALUEIN:  表示map阶段输入kv中的v类型
 * KEYOUT:   表示map阶段输出kv中的k类型
 * VALUEOUT: 表示map阶段输出kv中的v类型
 */
@Slf4j
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> 

    private Text outkey = new Text();
    private final static LongWritable outValue = new LongWritable(1);

    /**
     * mapper阶段核心方法 也是具体业务逻辑实现的方法
     * 注意,该方法被调用的次数和输入的kv键值对有关,每当TextInputFormat读取返回一个kv键值对,就调用一次map方法进行业务处理
     * 默认情况下,map方法是基于行来处理数据
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        //拿取一行数据转换String
        String line = value.toString();
        if (StringUtils.isBlank(line)) 
            return;
        
        Arrays.stream(line.split(" ")).filter(StringUtils::isNoneBlank).forEach(word -> 
            outkey.set(word);
            //输出数据 把每个单词标记1  输出的结果<单词,1>
            //使用上下文对象 将数据输出
            try 
                context.write(outkey, outValue);
             catch (Exception e) 
                log.error("Map 阶段输出数据异常:", e);
            
        );
    

编写 Reducer 继承自 HadoopReducer

/**
 * 其中 Reducer 的四个泛型表示
 * KEYIN: 表示的是reduce阶段输入kv中k的类型
 * VALUEIN:表示的是reduce阶段输入kv中v的类型
 * KEYOUT: 表示的是reduce阶段输出kv中k的类型
 * VALUEOUT:表示的是reduce阶段输出kv中v的类型
 */
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> 

    private LongWritable outValue =new LongWritable();

    /**
     * reduce阶段核心方法
     * 当map的所有输出数据来到reduce之后 ,首先根据 key 进行排序 a-z ,然后将 key相同的分为一组,
     * 分组之后,同一组的数据组成一个新的kv键值对,调用一次reduce方法。
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException 
        long count = 0;
        for (LongWritable value : values) 
            count += value.get();
        
        outValue.set(count);
        //最终使用上下文对象输出结果
        context.write(key,outValue);
    

最后是驱动类主方法,声明出 Job ,加载上面的类,并声明数据输入的来源:

public class WordCountDriver 

    public static void main(String[] args) throws Exception
        //创建配置对象
        Configuration conf = new Configuration();

        // 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
        // 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);

        // 输出目录必须为空,如果不为空则会报错提示
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output))
            fs.delete(output,true);
        


        //设置MapReduce程序的运行模式 如果不指定 默认是local模式
        //conf.set("mapreduce.framework.name","local");

        //构建Job作业的实例 参数(配置对象、Job名字)
        Job job = Job.getInstance(conf, WordCountDriver.class.getName());

        //设置mr程序运行的主类
        job.setJarByClass(WordCountDriver.class);

        //设置本次mr程序的mapper类型  reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //指定mapper阶段输出的key value数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //指定reducer阶段输出的key value类型 也是mr程序最终的输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //MapReduce 默认的读取数据组件,读数据的行为是:一行一行读取数据  返回kv键值对,k:偏移量,v:一行的文本内容
        FileInputFormat.setInputPaths(job,input);
        FileOutputFormat.setOutputPath(job,output);
        
        //最终提交本次job作业
        //参数表示是否开启实时监视追踪作业的执行情况
        boolean resultflag = job.waitForCompletion(true);

        System.exit(resultflag ? 0: 1);
    

下面可以在本地运行该程序,如果配置中没有指定,则是 local 本地模式,它会读取 mapred-site.xml 文件中的 mapreduce.framework.name 参数值。

启动时指定输入输出目录,在输入目录下要存放执行的数据:


运行后看到日志都处理 100 % 了,到输出目录下查看结果:


已经得到我们预计的效果,不过在日志中提出了这样一个警告:

推荐我们使用 ToolRunner ,下面对驱动类修改:

public class WordCountDriverTool extends Configured implements Tool 

    public static void main(String[] args) throws Exception
        //创建配置对象
        Configuration conf = new Configuration();
        int status = ToolRunner.run(conf, new WordCountDriverTool(), args);
        System.exit(status);
    

    @Override
    public int run(String[] args) throws Exception 
        // 配置本次作业的输入数据路径 和输出数据路径,通过参数传递进来,
        // 如果输入是一个目录,则会读取目录下的所有文件汇总到进行处理
        Path input = new Path(args[0]);
        Path output = new Path(args[1]);

        // 输出目录必须为空,如果不为空则会报错提示
        FileSystem fs = FileSystem.get(getConf());
        if(fs.exists(output))
            fs.delete(output,true);
        

        //设置MapReduce程序的运行模式 如果不指定 默认是local模式
        //conf.set("mapreduce.framework.name","local");

        //构建Job作业的实例 参数(配置对象、Job名字)
        Job job = Job.getInstance(getConf(), WordCountDriverTool.class.getName());

        //设置mr程序运行的主类
        job.setJarByClass(WordCountDriverTool.class);

        //设置本次mr程序的mapper类型  reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //指定mapper阶段输出的key value数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //指定reducer阶段输出的key value类型 也是mr程序最终的输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //MapReduce 默认的读取数据组件,读数据的行为是:一行一行读取数据  返回kv键值对,k:偏移量,v:一行的文本内容
        FileInputFormat.setInputPaths(job,input);
        FileOutputFormat.setOutputPath(job,output);

        //最终提交本次job作业
        //参数表示是否开启实时监视追踪作业的执行情况
        return job.waitForCompletion(true)? 0:1;
    

运行之后同样得到结果。

四、集群运行方式

上面的方式在本地进行了执行,方便进行测试,一旦测试完成还是要放在 Hadoop 集群中进行运行。

其中有两个非常重要的配置:

mapreduce.framework.name=yarn
yarn.resourcemanager.hostname=node1

默认下回去读取当前 Hadoop 环境下的配置,代码中可以不指定。

使用 Maven 对项目进行打包,最好在 pom 中指定 mainClass,如果没有依赖其他组件可以使用 maven-jar-plugin 进行打包,打包后 jar 包比较小:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-jar-plugin</artifactId>
    <version>2.4</version>
    <configuration>
        <archive>
            <manifest>
                <addClasspath>true</addClasspath>
                <classpathPrefix>lib/</classpathPrefix>
                <mainClass>com.bxc.hadoophdfsdemo.mapreduce.WordCountDriverTool</mainClass>
            </manifest>
        </archive>
    </configuration>
</plugin>

如果有第三方依赖,最好把依赖也打进 jar 包,可以使用下面插件,防止找不到依赖类:

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <archive>
            <manifest>
                <mainClass>com.bxc.hadoophdfsdemo.mapreduce.WordCountDriverTool</mainClass>
            </manifest>
        </archive>
        <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
    </configuration>
</plugin>

打包成 jar 包:

mvn clean package

把打包后的 jar 包放到 Hadoop 集群的任意一个节点上。

HDFS 中创建数据目录和上传测试数据:

执行 jar 示例:

hadoop jar <jar path> <driver class path> <args>
# 或者
yarn jar <jar path> <driver class path> <args>

运行打包后的 jar:

yarn jar hadoop-mapreduce-demo.jar  /test/input/ /test/output/


可以看到执行过程,最后去输出目录查看结果:

已经得到结果了。

以上是关于Hadoop3 - MapReduce 介绍于基本使用的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop3 - MapReduce 介绍于基本使用

Hadoop3 - MapReduce 分组介绍及案例实践

Hadoop3 - MapReduce 分组介绍及案例实践

Hadoop3 - MapReduce 分区介绍及自定义分区

Hadoop3 - MapReduce 分区介绍及自定义分区

Hadoop3 - MapReduce COVID-19 案例实践