hadoop MapReduce自定义分区Partition输出各运营商的手机号码

Posted shadowfiend

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop MapReduce自定义分区Partition输出各运营商的手机号码相关的知识,希望对你有一定的参考价值。

MapReduce和自定义Partition

MobileDriver主类
package Partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

public class MobileDriver 
    public static void main(String[] args) 
        String[] paths = "F:\\mobile.txt", "F:\\output";

        JobUtils.commit(paths, true, 3, MobileDriver.class,
                MobileMapper.class, Text.class, NullWritable.class, MobilePartition.class,
                MobileReduce.class, Text.class, NullWritable.class);

    
JobUtils工具类
package Partition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;

public class JobUtils 
    private static Configuration conf;

    static 
        conf = new Configuration();
    

    /**
     * 提交job
     *
     * @param paths        输入输出路径数组
     * @param isPartition  是否包含自定义分区类
     * @param reduceNumber reduce数量(若自定义分区为true,则此项必须>=自定义分区数)
     * @param params       可变参数
     */
    public static void commit(String[] paths, boolean isPartition, int reduceNumber, Class... params) 
        try 
            Job job = Job.getInstance(conf);
            job.setJarByClass(params[0]);

            job.setMapperClass(params[1]);
            job.setOutputKeyClass(params[2]);
            job.setOutputValueClass(params[3]);

            if (isPartition) 
                job.setPartitionerClass(params[4]);//设置自定义分区;
            

            if (reduceNumber > 0) 
                job.setNumReduceTasks(reduceNumber);
                job.setReducerClass(params[5]);
                job.setOutputKeyClass(params[6]);
                job.setOutputValueClass(params[7]);
             else 
                job.setNumReduceTasks(0);
            
            deleteDirectory(paths[1]);
            FileInputFormat.setInputPaths(job, new Path(paths[0]));
            FileOutputFormat.setOutputPath(job, new Path(paths[1]));
            job.waitForCompletion(true);
         catch (InterruptedException | ClassNotFoundException | IOException e) 
            e.printStackTrace();
        
    

    //输出目录存在则删除
    public static void deleteDirectory(String path) 
        File pFile = new File(path);
        if (!pFile.exists()) 
            return;
        
        if ((pFile.isDirectory() && pFile.listFiles().length == 0) || pFile.isFile()) 
            pFile.delete();
         else 
            for (File file : pFile.listFiles()) 
                if (file.isDirectory()) 
                    deleteDirectory(file.getAbsolutePath());
                 else 
                    file.delete();
                
            
        
        pFile.delete();
    
Map自定义类
package Partition;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MobileMapper extends Mapper<LongWritable, Text, Text, NullWritable> 
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        String line = value.toString();
        String[] mobiles = line.split("\t");
        for (String mobile : mobiles) 
            //不满足11位手机号进行过滤
            if (mobile.length() == 11) 
                context.write(new Text(mobile), NullWritable.get());
            
        
    
Reduce自定义类
package Partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MobileReduce extends Reducer<Text, NullWritable, Text, NullWritable> 
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException 
        context.write(key, NullWritable.get());
    
Partition自定义分区类
package Partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.Arrays;

public class MobilePartition extends Partitioner<Text, NullWritable> 
    @Override
    public int getPartition(Text text, NullWritable nullWritable, int i) 
        String line = text.toString();
        String flag = line.substring(0, 3);
        if (Arrays.asList(Mobile.CHINA_MOBILE).contains(flag)) 
            return 0;//移动
         else if (Arrays.asList(Mobile.CHINA_UNICOM).contains(flag)) 
            return 1;//联通
         else 
            return 2;//电信
        
    

以上是关于hadoop MapReduce自定义分区Partition输出各运营商的手机号码的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop3 - MapReduce 分区介绍及自定义分区

Hadoop3 - MapReduce 分区介绍及自定义分区

hadoop MapReduce自定义分区Partition输出各运营商的手机号码

Hadoop中的MapReduce框架原理自定义Partitioner步骤在Job驱动中,设置自定义PartitionerPartition 分区案例

Hadoop自定义分区Partitioner

未调用 hadoop mapreduce 分区程序