TopN案例

Posted jztx123

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TopN案例相关的知识,希望对你有一定的参考价值。

准备三份数据

t1 2067
t2 2055
t3 2055
t4 1200
t5 2367
t6 255
t7 2555
t8 12100
t9 20647
t10 245
t11 205
t12 100
t111 1067
t112 2155
t113 2065
t114 1290
t115 237
t116 25
t117 15
t118 1
t119 10647
t110 2995
t111 2057
t112 10044
t211 67
t212 55
t213 65
t214 90
t215 37
t216 425
t217 155
t218 189
t219 1047
t210 295
t211 27
t212 144

定义Mapper类

package com.hadoop.TopN;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.TreeMap;

public class TopMapper extends Mapper<Object, Text, NullWritable, Text> {
    private TreeMap<Integer, Text> map = new TreeMap<>();

    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String[] words = value.toString().split(" ");
        String number = words[1];
        map.put(Integer.parseInt(number), new Text(value)); //此处必须new Text,不然数组越界,大坑!
        if (map.size() > 10) {
            map.remove(map.firstKey());
        }
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        for (Text text : map.values()) {
            context.write(NullWritable.get(),text);
        }
    }
}

定义Reducer类

package com.hadoop.TopN;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.TreeMap;

public class TopReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
    private TreeMap<Integer, Text> map = new TreeMap<>();

    @Override
    protected void reduce(NullWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            String[] strs = value.toString().split(" ");
            map.put(Integer.parseInt(strs[1]),new Text(value));
            if (map.size() >10){
                map.remove(map.firstKey());
            }
        }
        for (Text text:map.values()){
            context.write(NullWritable.get(),text);
        }
    }
}

编写Driver类

package com.hadoop.TopN;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TopDriver {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(TopDriver.class);
        job.setMapperClass(TopMapper.class);
        job.setReducerClass(TopReducer.class);
        job.setNumReduceTasks(1);   //重点
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.setInputPaths(job,new Path("input"));
        FileOutputFormat.setOutputPath(job,new Path("output/topn"));
        job.waitForCompletion(true);
    }
}

输出结果part-r-00000

t113 2065
t1 2067
t112 2155
t5 2367
t7 2555
t110 2995
t112 10044
t119 10647
t8 12100
t9 20647

以上是关于TopN案例的主要内容,如果未能解决你的问题,请参考以下文章

大数据讲课笔记5.7 MR案例—TopN

Flink应用案例统计实现TopN的两种方式

Flink应用案例统计实现TopN的两种方式

MapReduce案例(数据中获取最大值TopN)

TopN案例

大数据Hive3.x数仓开发窗口函数案例:连续N次登录的用户;级联累加求和;分组TopN