HDFS 手写mapreduce单词计数框架

Posted areyouready

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HDFS 手写mapreduce单词计数框架相关的知识,希望对你有一定的参考价值。

一、数据处理类

package com.css.hdfs;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;

/**
 * 需求:文件(hello world hello teacher hello john tom ) 统计每个单词出现的次数?
 * 数据存储在hdfs、统计出来的结果存储到hdfs
 * 
 * 2004google:dfs/bigtable/mapreduce
 *
 * 大数据解决的问题?
 *  1.海量数据的存储
 *         hdfs
 *  2.海量数据的计算
 *      mapreduce
 *  
 *  思路?
 *    hello 2
 *    world 1
 *    hello 1
 *    ...
 *    
 *  基于用户体验:
 *   用户输入数据
 *   用户处理的方式
 *   用户指定结果数据存储位置
 */
public class HdfsWordCount {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, InterruptedException, URISyntaxException {
        // 反射
        Properties pro = new Properties();
        // 加载配置文件
        pro.load(HdfsWordCount.class.getClassLoader().getResourceAsStream("job.properties"));
        Path inPath = new Path(pro.getProperty("IN_PATH"));
        Path outPath = new Path(pro.getProperty("OUT_PATH"));
        Class<?> mapper_class = Class.forName(pro.getProperty("MAPPER_CLASS"));
        // 实例化
        Mapper mapper = (Mapper) mapper_class.newInstance();
        Context context = new Context();
        // 构建hdfs客户端对象
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.146.132:9000/"), conf, "root");
        // 读取用户输入的文件
        RemoteIterator<LocatedFileStatus> iter = fs.listFiles(inPath, false);
        while (iter.hasNext()) {
            LocatedFileStatus file = iter.next();
            // 打开路径 获取输入流
            FSDataInputStream in = fs.open(file.getPath());
            BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8"));
            String line = null;
            while ((line = br.readLine()) != null) {
                // 调用map方法执行业务逻辑
                mapper.map(line, context);
            }
            // 关闭资源
            br.close();
            in.close();
        }
        
        // 如果用户输入的结果路径不存在 则创建一个
        Path out = new Path("/wc/out/");
        if (!fs.exists(out)) {
            fs.mkdirs(out);
        }
        
        // 将缓存的结果放入hdfs中存储
        HashMap<Object, Object> contextMap = context.getContextMap();
        FSDataOutputStream out1 = fs.create(outPath);
        
        // 遍历hashmap
        Set<Entry<Object, Object>> entrySet = contextMap.entrySet();
        for (Entry<Object, Object> entry : entrySet) {
            // 写数据
            out1.write((entry.getKey().toString() + "	" + entry.getValue() + "
").getBytes());
        }
        // 关闭资源
        out1.close();
        fs.close();
        
        System.out.println("数据统计结果完成......");
    }
}

二、接口类

package com.css.hdfs;

/**
 * 思路:
 * 接口设计
 */
public interface Mapper {
    // 调用方法
    public void map(String line, Context context);
}

三、数据传输类

package com.css.hdfs;

import java.util.HashMap;

/**
 * 思路:
 * 数据传输的类
 * 封装数据
 * 集合
 * <单词,1>
 */
public class Context {
    // 数据封装
    private HashMap<Object, Object> contextMap = new HashMap<>();
    
    // 写数据
    public void write(Object key, Object value){
        // 放数据到map中
        contextMap.put(key, value);
    }
    
    // 定义根据key拿到值方法
    public Object get(Object key){
        return contextMap.get(key);
    }
    
    // 拿到map中的数据内容
    public HashMap<Object, Object> getContextMap(){
        return contextMap;
    }
}

四、单词计数类

package com.css.hdfs;

/**
 * 思路:
 * 添加一个map方法 单词切分 相同key的value ++
 */
public class WordCountMapper implements Mapper{

    @Override
    public void map(String line, Context context) {
        // 拿到这行数据 切分
        String[] words = line.split(" ");
        // 拿到单词 相同的key value++  hello 1 world 1
        for (String word : words) {
            Object value = context.get(word);
            if (null == value) {
                context.write(word, 1);
            }else {
                // 不为空
                int v = (int)value;
                context.write(word, v+1);
            }
        }
    }
}

五、配置文件job.properties

IN_PATH=/wc/in
OUT_PATH=/wc/out/rs.txt
MAPPER_CLASS=com.css.hdfs.WordCountMapper

 

以上是关于HDFS 手写mapreduce单词计数框架的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop- Wordcount程序原理及代码实现

大数据学习之手写MR框架(WordCount程序开发)08

理解MapReduce

理解MapReduce

理解MapReduce

理解Mapreduce