MapReduce数据筛选

Posted 总被人想的静静

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce数据筛选相关的知识,希望对你有一定的参考价值。

需求:

编写MapReduce程序算出高峰时间段(如9-10点)哪张表被访问的最频繁的表,以及这段时间访问这张表最多的用户,以及这个用户访问这张表的总时间开销。  

测试数据:


TableName(表名),Time(时间),User(用户),TimeSpan(时间开销)

t003 6:00 u002 180
t003 7:00 u002 180
t003 7:08 u002 180
t003 7:25 u002 180
t002 8:00 u002 180
t001 8:00 u001 240

t001 9:00 u002 300
t001 9:11 u001 240
t003 9:26 u001 180
t001 9:39 u001 300
*t001 10:00 u001 200


代码

方法一:

package com.table.main;

import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TableUsed {

    public static class MRMapper extends Mapper<LongWritable, Text, Text, Text> {
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().substring(1).split("\\s+");
            Long time = Long.parseLong(split[1].charAt(0) + "");
            // 筛选9-10点使用过的表
            if (time == 9 || time == 10) {
                context.write(new Text(split[0]), new Text(split[2] + ":" + split[3]));
            }
        }
    }

    public static class MRReducer extends Reducer<Text, Text, Text, Text> {
        // 存放使用量最大的表的表名及用户
        public static HashMap<String, HashMap<String, Integer>> map = new HashMap<String, HashMap<String, Integer>>();
        // 最大用使用量
        public static int max_used_num = 0;
        // 使用量最大的表
        public static String table = "";

        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            HashMap<String, Integer> user_map = new HashMap<String, Integer>();

            int table_used_num = 0;
            for (Text t : values) {
                table_used_num++;
                String[] split = t.toString().split(":");

                // 如map中已经存在的用户则把使用时间叠加 不存在则添加该用户
                if (user_map.get(split[0]) == null) {
                    user_map.put(split[0], Integer.parseInt(split[1]));
                } else {
                    Integer use_time = user_map.get(split[0]);
                    use_time += Integer.parseInt(split[1]);
                    user_map.put(split[0], use_time);
                }
            }
            if (table_used_num > max_used_num) {
                map.put(key.toString(), user_map);
                table = key.toString();
                max_used_num = table_used_num;
            }
        }

        protected void cleanup(Context context) throws IOException, InterruptedException {
            
            // 循环map,查出使用时间最长的用户信息
            HashMap<String, Integer> map2 = map.get(table);

            int max = 0;
            String max_used_user = "";
            for (HashMap.Entry<String, Integer> m : map2.entrySet()) {
                if (m.getValue() > max) {
                    max = m.getValue();
                    max_used_user = m.getKey();
                }
            }
            context.write(new Text(table), new Text("\t" + max_used_user + "\t" + map2.get(max_used_user)));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);
        job.setJarByClass(TableUsed.class);

        job.setMapperClass(MRMapper.class);
        job.setReducerClass(MRReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop5:9000/input/table_time.txt"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop5:9000/output/put2"));
        System.out.println(job.waitForCompletion(true) ? 1 : 0);
    }
}
缺点:只算出使用时间最长的用户,没有判断该用户是否是使用次数最多的

方法二:

package com.table.main;

import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TableUsed {

    public static class MRMapper extends Mapper<LongWritable, Text, Text, Text> {
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] split = value.toString().substring(1).split("\\s+");
            Long time = Long.parseLong(split[1].charAt(0) + "");
            // 筛选9-10点使用过的表
            if (time == 9 || time == 10) {
                context.write(new Text(split[0]), new Text(split[2] + ":" + split[3]));
            }
        }
    }

    public static class MRReducer extends Reducer<Text, Text, Text, Text> {
        //                  表的最大使用次数        使用该表最多的用户
        public static int max_used_num = 0, max_user_used = 0;
        //                      使用量最大的表     使用该表最多的用户名
        public static String max_used_table = "", user_name = "";
        //                  使用次数最多的用户的 使用时间
        public static Integer user_used_time = 0;

        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {

            HashMap<String, Integer> user_map = new HashMap<String, Integer>();
            HashMap<String, Integer> user_used_map = new HashMap<String, Integer>();

            int table_used_num = 0;// 表的使用次数
            Integer use_num = 0;// 用户使用次数
            Integer use_time = 0;//使用时间
            String username = "";//用户名
            
            for (Text t : values) {
                table_used_num++;
                String[] split = t.toString().split(":");

                // 如map中已经存在的用户则把使用时间叠加 不存在则添加该用户
                if (user_map.get(split[0]) == null) {

                    user_map.put(split[0], Integer.parseInt(split[1]));
                    user_used_map.put(split[0], 1);
                } else {
                    use_time = user_map.get(split[0]);
                    use_time += Integer.parseInt(split[1]);
                    user_map.put(split[0], use_time);

                    use_num = user_used_map.get(split[0]);
                    use_num ++;
                    user_used_map.put(split[0], use_num);
                }

                /**
                 * 判断该用户是否为此表使用次数最多的,
                 * 是则存进user_map和user_used_map,否则不存;
                 * 由于只需要求使用量最多的用户,因此使用量不是最多用户没有必要存在于map中
                 */
                if (use_num > max_user_used) {
                    username = split[0];
                    max_user_used = use_num;
                    user_used_time = use_time;
                    //此处也可以不remove()
                    user_used_map.remove(split[0]);
                    user_map.remove(split[0]);
                }
            }

            if (table_used_num > max_used_num) {
                max_used_table = key.toString();
                max_used_num = table_used_num;
                user_name = username;
            }
        }

        protected void cleanup(Context context) throws IOException, InterruptedException {
            
            context.write(new Text(max_used_table), new Text(max_user_used + "\t" + user_name + "\t" + user_used_time));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);
        job.setJarByClass(TableUsed.class);

        job.setMapperClass(MRMapper.class);
        job.setReducerClass(MRReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path("hdfs://hadoop5:9000/input/table_time.txt"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop5:9000/output/put6"));
        System.out.println(job.waitForCompletion(true) ? 1 : 0);
    }
}

以上是关于MapReduce数据筛选的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce执行过程

mapreduce实现数据去重

MapReduce-线性回归

SQLite的LIKE语句实现字符片段筛选的功能

SQLite的LIKE语句实现字符片段筛选的功能

SQLite的LIKE语句实现字符片段筛选的功能