MapReduce实现WordCount
Posted NC_NE
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce实现WordCount相关的知识,希望对你有一定的参考价值。
一、MapReduce 核心编程思想
主要为map阶段和reduce阶段,如图
二、MapReduce 进程
(1)MrAppMaster:负责整个程序的过程调度及状态协调。
(2)MapTask:负责 Map 阶段的整个数据处理流程。
(3)ReduceTask:负责 Reduce 阶段的整个数据处理流程。
三、MapReduce 编程规范
用户编写的程序分成三个部分:Mapper、Reducer 和 Driver。
1.Mapper阶段
(1)用户自定义的类要继承Mapper类
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper中的业务逻辑写在map()方法中
(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(5)map()方法(MapTask进程)对每一个<K,V>调用一次
2.Reducer阶段
(1)用户自定义的类要继承Reducer类
(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(3)Reducer的业务逻辑写在reduce()方法中
(4)ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
3.Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是
封装了MapReduce程序相关运行参数的job对象
四、WordCount测试
1、本地测试
需要首先配置好 HADOOP_HOME 变量以及 Windows 运行依赖
配置 HADOOP_HOME 环境变量
配置 Path 环境变量
数据准备(inpuword.txt)
代码环境准备
(1)在 pom.xml 文件中添加如下依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
</dependencies>
(2)在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
(3)编写 Mapper 类
package com.ouyangl.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author oyl
* @create 2021-06-03 22:06
* @Description 参数:
* KEYIN, map阶段输入的key的类型:LongWritable
* VALUEIN map阶段输入value的类型:Text
* KEYOUT map阶段输出的key类型:Text
* VALUEOUT map阶段输出的value类型,IntWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text outK = new Text();
private IntWritable outv = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取第一行数据
String line = value.toString();
//截取
String[] words = line.split(" ");
//输出
for (String word : words) {
outK.set(word);
context.write(outK,outv);
}
}
}
(4)编写reducer类
package com.ouyangl.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author oyl
* @create 2021-06-03 22:07
* @Description
* * KEYIN, map阶段输入的key的类型:Text
* * VALUEIN map阶段输入value的类型:IntWritable
* * KEYOUT map阶段输出的key类型:Text
* * VALUEOUT map阶段输出的value类型,IntWritable
*/
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
private int sum;
private IntWritable outv = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
sum=0;
//累加求和
for (IntWritable count : values) {
sum += count.get();
}
//封装 k v
outv.set(sum);
//输出 k v
context.write(key,outv);
}
}
(5)编写 Driver 驱动类
package com.ouyangl.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author oyl
* @create 2021-06-03 22:07
* @Description
*
*/
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、设置jar包路径
job.setJarByClass(WordCountDriver.class);
//3、关联mapper和reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4、设置map输出的key类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5、设置最终输出的KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6、设置输入路径和输出路径
FileInputFormat.setInputPaths(job,new Path("C:\\\\Users\\\\oyl\\\\Desktop\\\\HADOOP\\\\inputword.txt"));
FileOutputFormat.setOutputPath(job,new Path("C:\\\\Users\\\\oyl\\\\Desktop\\\\HADOOP\\\\output1"));
//7、提交 job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
注意:一定注意不要引错了包 ,输出路径一定不能存在,不然会报错,
运行结果
2、提交集群测试
在pox添加打包插件,将程序打成 jar 包
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
修改Dirver类的输入输出路径(通过传参传进来)
//6、设置输入路径和输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
打包
我们将不带依赖的jar包(bigdata-hadoop-1.0-SNAPSHOT.jar)放到hadoop集群上执行
hadoop jar bigdata-hadoop-1.0-SNAPSHOT.jar com.ouyangl.mapreduce.wordcount2.WordCountDriver /word.txt /output
/word.txt 和 /output都是hdfs上的路径
运行日志:
计算结果:
以上是关于MapReduce实现WordCount的主要内容,如果未能解决你的问题,请参考以下文章