MapReduce执行流程
Posted 月疯
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce执行流程相关的知识,希望对你有一定的参考价值。
WordCount案例:
package com.hadoop.reduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCount
//临时配置HADOOP_HOME环境变量
static
System.setProperty("hadoop.home.dir","H:\\\\yingjian\\\\hadoop\\\\hadoop-2.6.0-cdh5.9.3\\\\hadoop-2.6.0-cdh5.9.3");
/**
* 默认MapReduce是通过TextInputeFormat进行切片,交给Mapper处理逻辑
* TestInputFormat:key当前行的首字母的索引,value:当前行数据
* Mapper类参数:输入key类型,:Long,输入Value类型:String,输出key类型:String,输出Value类型:Long
* MapReduce为了网络传输时时序列化文件较小,执行速度快,对基本类型进行包装,实现序列化
*/
public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>
LongWritable one = new LongWritable(1);
//没行数据拆分,拆分完进行输出
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
String words = value.toString();
//江梅行数据拆分成各个单词
String[] wordArr = words.split(" ");
//遍历各个单词
for (String word : wordArr)
//输出格式<单词,1>
context.write(new Text(word),one);
/**
* 进行全局聚合
* Reducer参数:输出key类型:String,输入Value类型:Long,输出key类型:String,输出Value类型:Long
*/
public static class MyReduce extends Reducer<Text,LongWritable,Text,LongWritable>
//将map输出结果进行全局聚合
//key:单词,values:个数[1,1,1]
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException
Long sum=0L;
for(LongWritable value:values)
sum +=value.get();
context.write(key,new LongWritable(sum));
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException
//创建一个job,也就是一个运行环境
Configuration conf=new Configuration();
//远程调用
conf.set("fs.defaultFS","hdfs://hadoopxxxx:8082");
Job job=Job.getInstance(conf,"word-count");
//程序入口
job.setJarByClass(WordCount.class);
//输入文件
FileInputFormat.addInputPath(job,new Path("D:\\\\words"));
//编写mapper处理逻辑
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//shuffle流程
//编写reduce处理逻辑
job.setReducerClass(MyReduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//输出文件
FileOutputFormat.setOutputPath(job,new Path("D:\\\\out"));
//运行job,需要放到Yarn上运行
boolean result =job.waitForCompletion(true);
System.out.print(result?1:0);
maven依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>groupId</groupId>
<artifactId>hadoop</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.9.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
1、单机运行
a、导入window支持的俩个文件:winutils.exe和hadoop.dll(放到bin目录下)
b、配置HADOOP_HOME环境变量(需要重启机器)
临时配置环境变量:System.steProperty("hadoop.home.dir","xxx")
c、修改NativeIO类,将access0调用吃直接换成true
2、远程调用运行
windows系统的代码直接连接linux系统的hadoop环境进行运行,运行结果可以保存到本地或者HDFS服务器上
conf.set("fs.defaultFS","hdfs://hadoopxxxx:8082")
3、打jar包放到hadoop集群中运行
maven打包jar
bin/yarn jar hadoop-test.jar file:out:/opt/module/hadoop-2.6.3/LICENSE.txt file:/opt/out
以上是关于MapReduce执行流程的主要内容,如果未能解决你的问题,请参考以下文章