大数据学习之手写MR框架(WordCount程序开发)08

Posted hidamowang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据学习之手写MR框架(WordCount程序开发)08相关的知识,希望对你有一定的参考价值。

简介:这里先手写一个MR程序,大致实现一个单词计数程序。帮助后面学习MapReduce组件。

1:先自定义一个Mapper接口

package it.dawn.HDFSPra.HandWritingMR;
/**
 * @author Dawn
 * @date 2019年4月30日23:28:00
 * @version 1.0
 * 
 * 思路?
 * 接口设计
 */
public interface Mapper {
	//通用方法
	public void map(String line,Context context);
}

2:定义一个Context类:

 

该类主要实现数据的传输,和数据的封装(这里用的一个HashMap进行封装的)

 

 

package it.dawn.HDFSPra.HandWritingMR;
/**
 * @author Dawn
 * @date 2019年4月30日23:18:13
 * @version 1.0
 * 
 * 思路?
 * 数据传输的类
 * 封装数据
 * 集合
 * <单词,1>
 */

import java.util.HashMap;

public class Context {
	//数据封装
	private HashMap<Object, Object> contextMap=new HashMap<>();
	
	//写数据
	public void write(Object key,Object value) {
		//放数据到map中
		contextMap.put(key, value);
	}
	
	//定义根据key拿到值方法
	public Object get(Object key) {
		return contextMap.get(key);
	}
	
	//拿到map当中的数据内容
	public HashMap<Object, Object> getContextMap(){
		return contextMap;
	}
}

 

3:实现Mapper类(其实这里就是简化的MapReduce阶段)

 

package it.dawn.HDFSPra.HandWritingMR;
/**
 * @author Dawn
 * @date 2019年4月30日23:22:35
 * @version 1.0
 * 
 * 思路:
 * 	添加一个map方法  单词切分 相同key的value ++
 */
public class WordCountMapper implements Mapper{

	@Override
	public void map(String line, Context context) {
		//1.拿到这行数据 切分
		String[] words=line.split(" ");
		
		//2.拿到单词 相同的key value++  hello 1 itstar 1
		for(String word:words) {
			Object value=context.get(word);
			//相对于插入数据到HashMap中
			if(null==value) {
				context.write(word, 1);
			}else {
				//HashMap不为空 
				int v=(int) value;
				context.write(word, v+1);
			}
		}
	}
	
}

 

4:写一个总程序将这几个串起来(相当于是一个MR中的那个Driver程序,指定MapReduce的类。总程序入口)

 

package it.dawn.HDFSPra.HandWritingMR;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

/**
 * @author Dawn
 * @date 2019年4月30日23:07:18
 * @version 1.0
 * 
 * 需求:文件(hello itstar hello hunter hello hunter henshuai ) 统计每个单词出现的次数?
 * 	   数据存储在hdfs、统计出来的结果存储到hdfs
 * 
 * 2004google:dfs/bigtable/mapreduce
 * 
 * 大数据解决的问题?
 * 	1.海量数据的存储
 * 		hdfs
 *  2.海量数据的计算
 *  	mapreduce
 *  
 * 思路?
 * 	 hello 1
 *   itstar 1
 *   hello 1
 *   ...
 *   
 * 基于用户体验:
 * 	用户输入数据(hdfs)
 *  用户处理的方式
 *  用户指定结果数据存储位置
 * 
 */
public class HdfsWordCount {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException, URISyntaxException {
		//反射
		Properties pro=new Properties();
		//加载配置文件
		pro.load(HdfsWordCount.class.getClassLoader().getResourceAsStream("job.properties"));
		
		Path inpath=new Path(pro.getProperty("IN_PATH"));
		Path outpath=new Path(pro.getProperty("OUT_PATH"));
		
		Class<?> mapper_class=Class.forName(pro.getProperty("MAPPER_CLASS"));
		//实例化
		Mapper mapper=(Mapper) mapper_class.newInstance();
		
		Context context=new Context();
		
		//1.构建hdfs客户端对象
		Configuration conf=new Configuration();
		FileSystem fs=FileSystem.get(new URI("hdfs://bigdata11:9000"), conf, "root");
		
		//2.读取用户输入的文件
		//读取到的是改文件下的所有的txt文件
		RemoteIterator<LocatedFileStatus> iter=fs.listFiles(inpath, false);
		
		while(iter.hasNext()) {
			LocatedFileStatus file=iter.next();
			//打开路径 获取输入流
			FSDataInputStream in=fs.open(file.getPath());
			BufferedReader br=new BufferedReader(new InputStreamReader(in, "utf-8"));
			
			String line = null;
			
			while((line=br.readLine()) != null) {
				//调用map方法执行业务逻辑
				mapper.map(line, context);
			}
			
			br.close();
			in.close();
		}
		
		//如果用户输入的结果路径不存在 则创建一个
		Path out = new Path("/wc/out/");
		if(!fs.exists(out))
			fs.mkdirs(out);
		
		//将缓存的结果放入hdfs中存储 
		HashMap<Object, Object> contextMap=context.getContextMap();
		FSDataOutputStream out1=fs.create(outpath);
		
		//遍历HashMap
		Set<Entry<Object, Object>> entrySet = contextMap.entrySet();
		for(Entry<Object, Object> entry:entrySet) {
			//写数据
			out1.write((entry.getKey().toString()+"\t"+entry.getValue()+"\n").getBytes());
		}
		
		out1.close();
		fs.close();
		
		System.out.println("数据统计结果完成....");
	}

}

 

  

配置文件(job.properties)如下:

IN_PATH=/dawn
OUT_PATH=/wc/out/rs.txt
MAPPER_CLASS=it.dawn.HDFSPra.HandWritingMR.WordCountMapper

 

 

 

  

以上是关于大数据学习之手写MR框架(WordCount程序开发)08的主要内容,如果未能解决你的问题,请参考以下文章

大数据学习之提交job流程,分区和合并11

大数据学习之MapReduce基础与Yarn集群安装09

大数据学习之sqoop框架 25

机器学习之手写数字识别-小数据集

大数据学习之Storm实时统计网站访问量案例35

odoo12学习之javascript