大数据第三课-WordCount 本地运行和集群运行

Posted 吃一天鱼六千多

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据第三课-WordCount 本地运行和集群运行相关的知识,希望对你有一定的参考价值。

一、MapReduce编程思想

mapReduce编程模型的总结:

MapReduce的开发一共有八个步骤其中map阶段分为2个步骤,shuffle阶段4个步骤,reduce阶段分为2个步骤

Map阶段2个步骤

第一步:设置inputFormat类,将我们的数据切分成keyvalue对,输入到第二步

第二步:自定义map逻辑,处理我们第一步的输入数据,然后转换成新的keyvalue对进行输出

shuffle阶段4个步骤

第三步:对输出的keyvalue对进行分区。相同key的数据发送到同一个reduce里面去,相同key合并,value形成一个集合

第四步:对不同分区的数据按照相同的key进行排序

第五步:对分组后的数据进行规约(combine操作),降低数据的网络拷贝(可选步骤)

第六步:对排序后的额数据进行分组,分组的过程中,将相同keyvalue放到一个集合当中

reduce阶段2个步骤

第七步:对多个map的任务进行合并,排序,写reduce函数自己的逻辑,对输入的keyvalue对进行处理,转换成新的keyvalue对进行输出

第八步:设置outputformat将输出的keyvalue对数据进行保存到文件中

二、Hadoop中的基本数据类型

 

hadoop没有沿用java当中基本的数据类型,而是自己进行封装了一套数据类型,其自己封装的类型与java的类型对应如下

Java类型

Hadoop Writable类型

Boolean

BooleanWritable

Byte

ByteWritable

Int

IntWritable

Float

FloatWritable

Long

LongWritable

Double

DoubleWritable

String

Text

Map

MapWritable

Array

ArrayWritable

byte[]

BytesWritable

 

三、单词统计的本地运行实现(要在windows下配置了hadoop环境)

1、添加需要的依赖包(pom.xml文件):添加以下内容

 

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-mr1-cdh5.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0-cdh5.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0-cdh5.14.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0-cdh5.14.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>7.0.0</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <!--   <verbal>true</verbal>-->
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

 

2、创建以下三个文件

 

 3、编写MyMapper类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

/**
 * 自定义mapper类需要继承Mapper,有四个泛型,
 * keyin: k1   行偏移量 Long
 * valuein: v1   一行文本内容   String
 * keyout: k2   每一个单词   String
 * valueout : v2   1         int
 * 在hadoop当中没有沿用Java的一些基本类型,使用自己封装了一套基本类型
 * long ==>LongWritable
 * String ==> Text
 * int ==> IntWritable
 *
 */
public class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    /**
     * 继承mapper之后,覆写map方法,每次读取一行数据,都会来调用一下map方法
     * @param key:对应k1
     * @param value:对应v1
     * @param context 上下文对象。承上启下,承接上面步骤发过来的数据,通过context将数据发送到下面的步骤里面去
     * @throws IOException
     * @throws InterruptedException
     * k1   v1
     * 0;hello,world
     *
     * k2 v2
     * hello 1
     * world   1
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取我们的一行数据
        String line = value.toString();
        String[] split = line.split(",");
        Text text = new Text();
        IntWritable intWritable = new IntWritable(1);
        for (String word : split) {
            //将每个单词出现都记做1次
            //key2 Text类型
            //v2 IntWritable类型
            text.set(word);
            //将我们的key2 v2写出去到下游
            context.write(text,intWritable);
        }
    }
}

 4、编写MyReduce类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    //第三步:分区   相同key的数据发送到同一个reduce里面去,相同key合并,value形成一个集合
    /**
     * 继承Reducer类之后,覆写reduce方法
     * @param key
     * @param values
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int result = 0;
        for (IntWritable value : values) {
            //将我们的结果进行累加
            result += value.get();
        }
        //继续输出我们的数据
        IntWritable intWritable = new IntWritable(result);
        //将我们的数据输出
        context.write(key,intWritable);
    }
}

 5、编写WordCount类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/*
这个类作为mr程序的入口类,这里面写main方法
 */
public class WordCount extends Configured implements Tool {
    /**
     * 实现Tool接口之后,需要实现一个run方法,
     * 这个run方法用于组装我们的程序的逻辑,其实就是组装八个步骤
     *
     * @param args
     * @return
     * @throws Exception
     */
    @Override
    public int run(String[] args) throws Exception {
        //获取Job对象,组装我们的八个步骤,每一个步骤都是一个class类
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf, "mrdemo1");

        //实际工作当中,程序运行完成之后一般都是打包到集群上面去运行,打成一个jar包
        //如果要打包到集群上面去运行,必须添加以下设置
        job.setJarByClass(WordCount.class);

        //第一步:读取文件,解析成key,value对,k1:行偏移量 v1:一行文本内容
        job.setInputFormatClass(TextInputFormat.class);
        //指定我们去哪一个路径读取文件
        TextInputFormat.addInputPath(job, new Path("file:///E:\\\\BigDataCode\\\\Java\\\\data\\\\wc\\\\input"));
        //第二步:自定义map逻辑,接受k1   v1 转换成为新的k2   v2输出
        job.setMapperClass(MyMapper.class);
        //设置map阶段输出的key,value的类型,其实就是k2 v2的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //第三步到六步:分区,排序,规约,分组都省略
        //第七步:自定义reduce逻辑
        job.setReducerClass(MyReducer.class);
        //设置key3 value3的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //第八步:输出k3 v3 进行保存
        job.setOutputFormatClass(TextOutputFormat.class);
        //一定要注意,输出路径是需要不存在的,如果存在就报错
        TextOutputFormat.setOutputPath(job, new Path("file:///E:\\\\BigDataCode\\\\Java\\\\data\\\\wc\\\\out_result"));
        //提交job任务
        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
        /***
         * 第一步:读取文件,解析成key,value对,k1   v1
         * 第二步:自定义map逻辑,接受k1   v1 转换成为新的k2   v2输出
         * 第三步:分区。相同key的数据发送到同一个reduce里面去,key合并,value形成一个集合
         * 第四步:排序   对key2进行排序。字典顺序排序
         * 第五步:规约 combiner过程 调优步骤 可选
         * 第六步:分组
         * 第七步:自定义reduce逻辑接受k2   v2 转换成为新的k3   v3输出
         * 第八步:输出k3 v3 进行保存
         *
         *
         */
    }

    /*
    作为程序的入口类
      */
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("hello", "world");
        //提交run方法之后,得到一个程序的退出状态码
        int run = ToolRunner.run(configuration, new WordCount(), args);
        //根据我们 程序的退出状态码,退出整个进程
        System.exit(run);
    }

}

四、单词统计的集群运行实现(打jar包)

1、将WordCount类中的路径更改成集群上的路径

 

 2、打  jar 包

 

 

 

3、拷贝jar包到集群上

 

 

 

 4、运行jar包,执行以下语句(后面那个com.yyy.wordcount.WordCount路径根据自己的设置:可以在自己IDEA中的文件上右键->Copy Reference)

 yarn jar yarn-1.0-SNAPSHOT.jar com.yyy.wordcount.WordCount

5、查看运行  http://node01:8088/cluster

 

6、查看运行结果

 

可以下载下来看一下对不对。

然后结束。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

 

以上是关于大数据第三课-WordCount 本地运行和集群运行的主要内容,如果未能解决你的问题,请参考以下文章

weekend110(Hadoop)的 第三天笔记

第三课:java开发hdfs

参考大数据厦门大学林子雨编著的《大数据技术原理与应用(第3版)》中第三课《HDFS编程实践(Hadoop3.1.3)》遇到的bug

Python学习,第三课 - 数据类型

Hadoop运行模式本地运行模式(官方WordCount)完全分布式运行模式(开发重点)scp安全拷贝rsync 远程同步工具xsync集群分发脚本同步环境变量配置(root所有着)

Flink学习 批流版本的wordcount JAVA版本