hadoop系列三:mapreduce的使用

Posted 朱小杰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop系列三:mapreduce的使用相关的知识,希望对你有一定的参考价值。

 

 

转载请在页首明显处注明作者与出处

http://www.cnblogs.com/zhuxiaojie/p/7224772.html

一:说明

 

此为大数据系列的一些博文,有空的话会陆续更新,包含大数据的一些内容,如hadoop,spark,storm,机器学习等。

 当前使用的hadoop版本为2.6.4

 

上一篇:hadoop系列二:HDFS文件系统的命令及JAVA客户端API

 

在下面可以看到统计一本小说(斗破苍穹)哪些词语出现了最多。

本来mapreducer只想写一篇的,可是发现写一篇太长了,所以就进行了拆分。

所有的部分都提供代码下载

目录可以在右侧查看,点击目录跳转到相应的位置

 

一:说明
二:wordcount字数统计功能
2.1:准备文件
2.2:编写Mapper的代码
2.3编写Reduce的代码
2.4:编写main方法执行这个mapreduce
2.5:把代码放在hadoop中运行
三:自定义序列化的类
3.1:自定义一个序列化的输出bean
3.2:编写mapper
3.3:编写reducer
 3.4:编写main方法
3.5:在hadoop中运行
四:数据分区(按照不同类型输出到不同的位置)
4.1:分区规则的代码
4.2:设置分区代码
4.3:分区的完整代码
4.4:在hadoop运行分区代码
五:数据排序及对象的重用
5.1:编写排序代码
5.2:编写mapper(对象的复用)
5.3:编写reducer
5.4:编写启动类
5.5:完整的代码
5.6:在hadoop中执行排序
六:统计一本小说中出现的词汇(包含Combiner)
6.1:准备工作
6.2:配置maven打包包含分词的依赖
6.3:数据汇总(Combiner)
6.4:排序阶段

 

 

 

二:wordcount字数统计功能

相应的代码在:代码地址--点我跳转

2.1:准备文件

既然是要统计字数,那么肯定是要有相应的文档,我们先准备一些这样的文档,我们准备两个文档,分别叫text1.txt和text2.txt

text1.txt

 

hello zhangsan
lisi nihao
hai zhangsan
nihao lisi
x xiaoming

 

text2.txt

zhangsan a
lisi b
wangwu c
jiji 7
haha xiaoming
xiaoming is gril

我们生成这样两个文件,待会去统计每个单词分别出现了多少次

 

 

2.2:编写Mapper的代码

 

直接贴上代码,相应的解释在注释中

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 这部分的输入是由mapreduce自动读取进来的
 * 简单的统计单词出现次数<br>
 * KEYIN 默认情况下,是mapreduce所读取到的一行文本的起始偏移量,Long类型,在hadoop中有其自己的序列化类LongWriteable
 * VALUEIN 默认情况下,是mapreduce所读取到的一行文本的内容,hadoop中的序列化类型为Text
 * KEYOUT 是用户自定义逻辑处理完成后输出的KEY,在此处是单词,String
 * VALUEOUT 是用户自定义逻辑输出的value,这里是单词出现的次数,Long
 * @author Administrator
 *
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
            throws IOException, InterruptedException {
        //这是mapreduce读取到的一行字符串
        String line = value.toString();
        String[] words = line.split(" ");
        
        for (String word : words) {
            //将单词输出为key,次数输出为value,这行数据会输到reduce中
            context.write(new Text(word), new LongWritable(1));
        }
    }
}

 

 

2.3编写Reduce的代码

同样直接上代码

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 第一个Text: 是传入的单词名称,是Mapper中传入的
 * 第二个:LongWritable 是该单词出现了多少次,这个是mapreduce计算出来的,比如 hello出现了11次
 * 第三个Text: 是输出单词的名称 ,这里是要输出到文本中的内容
 * 第四个LongWritable: 是输出时显示出现了多少次,这里也是要输出到文本中的内容
 * @author Administrator
 *
 */
public class WordCountReduce extends Reducer<Text, LongWritable, Text, LongWritable> {

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,
            Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        long count = 0;
        for (LongWritable num : values) {
            count += num.get();
        }
        context.write(key, new LongWritable(count));
    }
}

 

 

 

2.4:编写main方法执行这个mapreduce

写了mapper与reduce的代码,自然是需要一个main方法来把这些代码运行起来的,所以编写如下代码

 

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * 相当于运行在yarn中的客户端
 * @author Administrator
 *
 */
public class WordCountDriver {

    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        //如果是打包在linux上运行,则不需要写这两行代码
/*        //指定运行在yarn中
        conf.set("mapreduce.framework.name", "yarn");
        //指定resourcemanager的主机名
        conf.set("yarn.resourcemanager.hostname", "server1");*/
        Job job = Job.getInstance(conf);
        
        //使得hadoop可以根据类包,找到jar包在哪里
        job.setJarByClass(WordCountDriver.class);
        
        //指定Mapper的类
        job.setMapperClass(WordCountMapper.class);
        //指定reduce的类
        job.setReducerClass(WordCountReduce.class);
        
        //设置Mapper输出的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        //设置最终输出的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        
        //指定输入文件的位置,这里为了灵活,接收外部参数
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定输入文件的位置,这里接收启动参数
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        //将job中的参数,提交到yarn中运行
        //job.submit();
        try {
            job.waitForCompletion(true);
            //这里的为true,会打印执行结果
        } catch (ClassNotFoundException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

 

 

 

2.5:把代码放在hadoop中运行

代码写完了,要怎么运行呢?

(1)首先,肯定不是直接执行main方法运行,因为目前的代码,并不知道hadoop部署在哪里,我们要做的是,把这个项目打包,如果是maven项目,则使用maven package命令打包,把相应的jar包,上传到服务器中。

(2)其次,需要把之前的两个文本文件,text1.txt和text2.txt上传到hdfs中,因为既然是大数据,那么在实际环境中,肯定不可能是这么小的数据来进行计算,肯定是有着大量的数据,而这些数据,靠一台服务器肯定是放不下去的,也只有像hdfs这种大文件存储,或者一些其它的专门存放大数据的地方,才能存放了,我们使用如下的命令,把文件上传到hdfs中,如果这些命令看不懂,可以先看上一章节,hdfs的使用。

 

//创建一个目录
hadoop fs -mkdir -p /wordcount/input
//上传文件
hadoop fs -put text1.txt text2.txt /wordcount/input

 

(3)运行代码,带有main方法的代码,是可以使用java命令运行的,但是因为hadoop依赖了很多别的jar包,这样子运行代码,需要添加大量的依赖,写的命令很复杂,hadoop提供了这样的一个命令来执行代码

hadoop jar wordcount.jar com.zxj.hadoop.demo.mapreduce.wordcount.WordCountDriver /wordcount/input /wordcount/output

 这里来解释一下这条命令的意思,jar说明使用hadoop中内置的jar命令,也就是执行一个jar包。wordcount.jar 这个是上传的代码,也就是我们之前写的代码,打包之后上传到服务器中的名字。com.zxj.hadoop.demo.mapreduce.wordcount.WordCountDriver是需要运行哪个类,因为一个jar包中有可能有多个main方法,这样可以指定使用哪个类启动。最后两个参数 /wordcount/input 和 /wordcount/output,这是我们的代码中自定义的两个参数,第一个是文件的目录(意味着可以读取一整个目录中的多个文件),第二个是输出结果的目录。

执行完成之后,会有如下结果,如果没有抛出异常,或者写明失败,带有success的就是成功了。

现在我们可以去看一下输出结果

查看输出的文件

 

hadoop fs -ls /wordcount/output

 

第一个文件代表执行成功,第二个文件是输出结果文件,执行如下命令查看

从上图发现,zhangsan出现了3次,xiaoming出现了3次,nihao出现了2次,其它的是1次

 

 

 

 

 

 

三:自定义序列化的类

代码地址:下载代码

当输出的结果比较复杂的时候,就没办法使用Text,LongWritable这种类型来输出,这个时候我们可以自定义一个序列化的类,这个序列化不是jdk的序列化,而是hadoop自已的序列化,我们需要实现它

如下文档,保存并命名为staff.txt:

 

张三    江西    打车    200
李四    广东    住宿    600
王五    北京    伙食    320
张三    江西    话费    50
张三    湖南    打车    900
周六    上海    采购    3000
李四    西藏    旅游    1000
王五    北京    借款    500
李四    上海    话费    50
周六    北京    打车    600
张三    广东    租房    3050

 

 

3.1:自定义一个序列化的输出bean

之前我们一直使用LongWriteable或者Text来作为输入的内容,但是如果看这两个对象的源码,它们都是实现了Writable接口的,这是一个hadoop自带的序列化接口。

现在我们要输出一些信息,单单靠一个Text已经无法达到我们的效果的时候,我们就可以自定义一个对象,然后实现Writable接口

如下的代码,就是自定义一个可序列化的bean

 

    /**
     * 封装的bean
     */
    public static class SpendBean implements Writable{

        private Text userName;

        private IntWritable money;

        public SpendBean(Text userName, IntWritable money) {
            this.userName = userName;
            this.money = money;
        }

        /**
         * 反序列化时必须有一个空参的构造方法
         */
        public SpendBean(){}

        /**
         * 序列化的代码
         * @param out
         * @throws IOException
         */
        @Override
        public void write(DataOutput out) throws IOException {
            userName.write(out);
            money.write(out);
        }

        /**
         * 反序列化的代码
         * @param in
         * @throws IOException
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            userName = new Text();
            userName.readFields(in);
            money = new IntWritable();
            money.readFields(in);
        }

        public Text getUserName() {
            return userName;
        }

        public void setUserName(Text userName) {
            this.userName = userName;
        }

        public IntWritable getMoney() {
            return money;
        }

        public void setMoney(IntWritable money) {
            this.money = money;
        }

        @Override
        public String toString() {
            return userName.toString() + "," + money.get();
        }
    }

 

 

 

 

3.2:编写mapper

 编写mapper

 

    /**
     * Mapper
     */
    public static class GroupUserMapper extends Mapper<LongWritable,Text,Text,SpendBean>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String val = value.toString();
            String[] split = val.split("\\t");
            //这里就不作字符串异常的处理了,核心代码简单点
            String name = split[0];
            String province = split[1];
            String type = split[2];
            int money = Integer.parseInt(split[3]);
            SpendBean groupUser = new SpendBean();
            groupUser.setUserName(new Text(name));
            groupUser.setMoney(new IntWritable(money));
            context.write(new Text(name),groupUser);
        }
    }

 

 

3.3:编写reducer

编写reducer

 

/**
     * reducer
     */
    public static class GroupUserReducer extends Reducer<Text,SpendBean,Text,SpendBean> {
        /**
         * 姓名
         * @param key
         * @param values
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<SpendBean> values, Context context) throws IOException, InterruptedException {
            int money = 0;//消费金额
            //遍历
            for(SpendBean bean : values){
                money += bean.getMoney().get();
            }
            //输出汇总结果
            context.write(key,new SpendBean(key,new IntWritable(money)));
        }
    }

 

 

 3.4:编写main方法

编写main方法

    /**
     * 编写启动类
     * @param args
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(GroupUser.class); //设置jar中的启动类,可以根据这个类找到相应的jar包

        job.setMapperClass(GroupUserMapper.class); //设置mapper的类
        job.setReducerClass(GroupUserRecuder.class); //设置reducer的类

        job.setMapOutputKeyClass(Text.class); //mapper输出的key
        job.setMapOutputValueClass(SpendBean.class); //mapper输出的value

        job.setOutputKeyClass(Text.class); //最终输出的数据类型
        job.setOutputValueClass(SpendBean.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));//输入的文件位置
        FileOutputFormat.setOutputPath(job,new Path(args[1]));//输出的文件位置

        boolean b = job.waitForCompletion(true);//等待完成,true,打印进度条及内容
        if(b){
            //success
        }

    }

 

完整的代码如下,这里把几个类都写在一起了。

 

package com.zxj.hadoop.demo.mapreduce.staffspend.groupuser;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @Author 朱小杰
 * 时间 2017-07-23 .16:33
 * 说明 ...
 */
public class GroupUser {
    /**
     * Mapper
     */
    public static class GroupUserMapper extends Mapper<LongWritable,Text,Text,SpendBean>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String val = value.toString();
            String[] split = val.split("\\t");
            //这里就不作字符串异常的处理了,核心代码简单点
            String name = split[0];
            String province = split[1];
            String type = split[2];
            int money = Integer.parseInt(split[3]);
            SpendBean groupUser = new SpendBean();
            groupUser.setUserName(new Text(name));
            groupUser.setMoney(new IntWritable(money));
            context.write(new Text(name),groupUser);
        }
    }

    /**
     * reducer
     */
    public static class GroupUserReducer extends Reducer<Text,SpendBean,Text,SpendBean> {
        /**
         * 姓名
         * @param key
         * @param values
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text key, Iterable<SpendBean> values, Context context) throws IOException, InterruptedException {
            int money = 0;//消费金额
            //遍历
            for(SpendBean bean : values){
                money += bean.getMoney().get();
            }
            //输出汇总结果
            context.write(key,new SpendBean(key,new IntWritable(money)));
        }
    }

    /**
     * 封装的bean
     */
    public static class SpendBean implements Writable{

        private Text userName;

        private IntWritable money;

        public SpendBean(Text userName, IntWritable money) {
            this.userName = userName;
            this.money = money;
        }

        /**
         * 反序列化时必须有一个空参的构造方法
         */
        public SpendBean(){}

        /**
         * 序列化的代码
         * @param out
         * @throws IOException
         */
        @Override
        public void write(DataOutput out) throws IOException {
            userName.write(out);
            money.write(out);
        }

        /**
         * 反序列化的代码
         * @param in
         * @throws IOException
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            userName = new Text();
            userName.readFields(in);
            money = new IntWritable();
            money.readFields(in);
        }

        public Text getUserName() {
            return userName;
        }

        public void setUserName(Text userName) {
            this.userName = userName;
        }

        public IntWritable getMoney() {
            return money;
        }

        public void setMoney(IntWritable money) {
            this.money = money;
        }

        @Override
        public String toString() {
            return userName.toString() + "," + money.get();
        }
    }


    /**
     * 编写启动类
     * @param args
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(GroupUser.class); //设置jar中的启动类,可以根据这个类找到相应的jar包

        job.setMapperClass(GroupUserMapper.class); //设置mapper的类
        job.setReducerClass(GroupUserReducer.class); //设置reducer的类

        job.setMapOutputKeyClass(Text.class); //mapper输出的key
        job.setMapOutputValueClass(SpendBean.class); //mapper输出的value

        job.setOutputKeyClass(Text.class); //最终输出的数据类型
        job.setOutputValueClass(SpendBean.class);

        FileInputFormat.setInputPaths(job,new Path(args[0]));//输入的文件位置
        FileOutputFormat.setOutputPath(job,new Path(args[1]));//输出的文件位置

        boolean b = job.waitForCompletion(true);//等待完成,true,打印进度条及内容
        if(b){
            //success
        }

    }
}

 

 

3.5:在hadoop中运行

然后执行maven clean package命令,重新打包,并且上传到服务器中。

我们也创建一个目录,来存放之前的员工消费信息

hadoop fs -mkdir -p /staffspend/input

把之前准备好的员工文件上传到这个目录

hadoop fs -put staff.txt /staffspend/input

然后准备执行任务

 

hadoop jar hadoop-mapreduce-1.0.jar com.zxj.hadoop.demo.mapreduce.staffspend.groupuser.GroupUser /staffspend/input /staffspend/output

执行成功后,查看输出文件

hadoop fs -cat /staffspend/output/part-r-00000

 

 

 

 

 

四:数据分区(按照不同类型输出到不同的位置)

下载代码:点我下载

这样的需求也经常会有,我可能并不是仅仅需要总的数据查看,我还可能要查看每一个类型,比如第三部分的文件中,我可能想分别查看每个省中,每个人分别用了多少钱。

这个时候我们对上第三部分的代码进行修改

我们要增加输出bean中的省份字段,红色位置是修改过的部分

/**
     * 封装的bean
     */
    public static class SpendBean implements Writable{

        private Text userName;

        private IntWritable money;

        private Text province;


        public SpendBean(Text userName, IntWritable money, Text province) {
            this.userName = userName;
            this.money = money;
            this.province = province;
        }

        /**
         * 反序列化时必须有一个空参的构造方法
         */
        public SpendBean(){}

        /**
         * 序列化的代码
         * @param out
         * @throws IOException
         */
        @Override
        public void write(DataOutput out) throws IOException {
            userName.write(out);
            money.write(out);
            province.write(out);
        }

        /**
         * 反序列化的代码
         * @param in
         * @throws IOException
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            userName = new Text();
            userName.readFields(in);
            money = new IntWritable();
            money.readFields(in);
            province = new Text();
            province.readFields(in);
        }

        public Text getUserName() {
            return userName;
        }

        public Text getProvince() {
            return province;
        }

        public void setProvince(Text province) {
            this.province = province;
        }

        public void setUserName(Text userName) {
            this.userName = userName;
        }

        public IntWritable getMoney() {
            return money;
        }

        public void setMoney(IntWritable money) {
            this.money = money;
        }

        @Override
        public String toString() {
            return "SpendBean{" +
                    "userName=" + userName +
                    ", money=" + money +
                    ", province=" + province +
                    \'}\';
        }
    }

可以看到,上面的bean并没有改动什么特别的东西,完全是加了一个省份字段而已。

 

 

4.1:分区规则的代码

首先,如果要按照数据进行分区,我们肯定需要写分区的代码来告诉hadoop,我们写一个分区的类来继承org.apache.hadoop.mapreduce.Partitioner

hadoop中的分区,是在mapper结束后的reducer中,所以下面的代码是在reducer时运行的,我们对不同的省份进行规则划分,比如说江西就是对应的0分区

具体代码如下:

 

package com.zxj.hadoop.demo.mapreduce.staffspend.groupuser;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.HashMap;
import java.util.Map;

/**
 * @Author 朱小杰
 * 时间 2017-07-29 .11:14
 * 说明
 * key ,value是mapper中输出的类型,因为分区是在mapper完成之后进行的
 */
public class ProvincePartitioner extends Partitioner<Text, GroupUser.SpendBean> {
    private static Map<String,Integer> provinces = new HashMap<>();
    static {
        //这里给每一个省份编制一个分区
        provinces.put("江西",0);
        provinces.put("广东",1);
        provinces.put("北京",2);
        provinces.put("湖南",3);
        provinces.put("上海",4);
        provinces.put("西藏",5);
    }

    /**
     * 给指定的数据一个分区
     * @param text
     * @param spendBean
     * @param numPartitions
     * @return
     */
    @Override
    public int getPartition(Text text, GroupUser.SpendBean spendBean, int numPartitions) {
        Integer province = provinces.get(spendBean.getProvince().toString());
        province = province == null ? 6 : province;  //如果在省份列表中找不到,则指定一个默认的分区
        return province;
    }
}

 

很简单的代码,我们划分了6个分区,如果有的省份在这6个分区中找不到,那余下的就会进入第7个分区中。

 

 

4.2:设置分区代码

分区的代码既然写完了,那么就需要在运行的时候,指定这分区的规则是我们刚才写的代码,位置在运

以上是关于hadoop系列三:mapreduce的使用的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop系列Hadoop三大核心之MapReduce-程序编写

2021年大数据Hadoop(二十三):MapReduce的运行机制详解

2021年大数据Hadoop(二十):MapReduce的排序和序列化

Hadoop MapReduce编程 API入门系列之join(二十五)(未完)

Hadoop MapReduce编程 API入门系列之二次排序

Hadoop MapReduce编程 API入门系列之网页流量版本1(二十二)