hadoop MapReduce自定义分区Partition输出各运营商的手机号码
Posted shadowfiend
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop MapReduce自定义分区Partition输出各运营商的手机号码相关的知识,希望对你有一定的参考价值。
MapReduce和自定义Partition
MobileDriver主类
package Partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
public class MobileDriver
public static void main(String[] args)
String[] paths = "F:\\mobile.txt", "F:\\output";
JobUtils.commit(paths, true, 3, MobileDriver.class,
MobileMapper.class, Text.class, NullWritable.class, MobilePartition.class,
MobileReduce.class, Text.class, NullWritable.class);
JobUtils工具类
package Partition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
import java.io.IOException;
public class JobUtils
private static Configuration conf;
static
conf = new Configuration();
/**
* 提交job
*
* @param paths 输入输出路径数组
* @param isPartition 是否包含自定义分区类
* @param reduceNumber reduce数量(若自定义分区为true,则此项必须>=自定义分区数)
* @param params 可变参数
*/
public static void commit(String[] paths, boolean isPartition, int reduceNumber, Class... params)
try
Job job = Job.getInstance(conf);
job.setJarByClass(params[0]);
job.setMapperClass(params[1]);
job.setOutputKeyClass(params[2]);
job.setOutputValueClass(params[3]);
if (isPartition)
job.setPartitionerClass(params[4]);//设置自定义分区;
if (reduceNumber > 0)
job.setNumReduceTasks(reduceNumber);
job.setReducerClass(params[5]);
job.setOutputKeyClass(params[6]);
job.setOutputValueClass(params[7]);
else
job.setNumReduceTasks(0);
deleteDirectory(paths[1]);
FileInputFormat.setInputPaths(job, new Path(paths[0]));
FileOutputFormat.setOutputPath(job, new Path(paths[1]));
job.waitForCompletion(true);
catch (InterruptedException | ClassNotFoundException | IOException e)
e.printStackTrace();
//输出目录存在则删除
public static void deleteDirectory(String path)
File pFile = new File(path);
if (!pFile.exists())
return;
if ((pFile.isDirectory() && pFile.listFiles().length == 0) || pFile.isFile())
pFile.delete();
else
for (File file : pFile.listFiles())
if (file.isDirectory())
deleteDirectory(file.getAbsolutePath());
else
file.delete();
pFile.delete();
Map自定义类
package Partition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class MobileMapper extends Mapper<LongWritable, Text, Text, NullWritable>
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
String line = value.toString();
String[] mobiles = line.split("\t");
for (String mobile : mobiles)
//不满足11位手机号进行过滤
if (mobile.length() == 11)
context.write(new Text(mobile), NullWritable.get());
Reduce自定义类
package Partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MobileReduce extends Reducer<Text, NullWritable, Text, NullWritable>
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException
context.write(key, NullWritable.get());
Partition自定义分区类
package Partition;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.Arrays;
public class MobilePartition extends Partitioner<Text, NullWritable>
@Override
public int getPartition(Text text, NullWritable nullWritable, int i)
String line = text.toString();
String flag = line.substring(0, 3);
if (Arrays.asList(Mobile.CHINA_MOBILE).contains(flag))
return 0;//移动
else if (Arrays.asList(Mobile.CHINA_UNICOM).contains(flag))
return 1;//联通
else
return 2;//电信
以上是关于hadoop MapReduce自定义分区Partition输出各运营商的手机号码的主要内容,如果未能解决你的问题,请参考以下文章
Hadoop3 - MapReduce 分区介绍及自定义分区
Hadoop3 - MapReduce 分区介绍及自定义分区
hadoop MapReduce自定义分区Partition输出各运营商的手机号码
Hadoop中的MapReduce框架原理自定义Partitioner步骤在Job驱动中,设置自定义PartitionerPartition 分区案例