排序数据 Hadoop Mapreduce
Posted
技术标签:
【中文标题】排序数据 Hadoop Mapreduce【英文标题】:Sort data Hadoop Mapreduce 【发布时间】:2015-12-16 06:58:07 【问题描述】:我有以下算法按字母顺序对数据进行排序
public void setup(Context context) throws IOException,
InterruptedException
conf = context.getConfiguration();
caseSensitive = conf.getBoolean("amasort.case.sensitive", true);
@Override
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException
String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
word.set(line+"_"+key.toString());
context.write(word, one);
System.out.println("key:"+key.toString()+";value:"+value.toString());
public static class ForwardReducer
extends Reducer<Text,NullWritable,Text,NullWritable>
private NullWritable result = NullWritable.get();
public void reduce(Text key, Iterable<NullWritable> values,
Context context
) throws IOException, InterruptedException
String originalWord = key.toString();
originalWord = originalWord.substring(0, originalWord.lastIndexOf("_"));
key.set(originalWord);
context.write(key, result);
public static void main(String[] args) throws Exception
Configuration conf = new Configuration();
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
Job job = Job.getInstance(conf, "word sort");
job.setJarByClass(AmaSort.class);
job.setMapperClass(LineMapper.class);
// job.setCombinerClass(ForwardReducer.class);
job.setReducerClass(ForwardReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
我尝试使用此算法对包含 (@xxxxxxx, 0,tcp,xx,1,1,1,2,4,5,....) 的 mydata 集进行排序,但输出所有以 @ 开头的行都被删除和数据行结构 0,tcp,x1x1,1,114,.... 被修改,我只想用这个特定的字符 (@) 对我的数据集进行排序,所有行在文件的第一个以 @ 开头,其余的保持相同的结构。 任何人都可以帮我修改这个算法吗?
【问题讨论】:
【参考方案1】:您可以使用以下修改后的代码进行排序,
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class AmaSort
static Configuration conf = null;
private static boolean caseSensitive;
private static Text word = new Text();
public static class LineMapper extends Mapper<Object, Text, Text, NullWritable>
public void setup(Context context) throws IOException, InterruptedException
conf = context.getConfiguration();
caseSensitive = conf.getBoolean("amasort.case.sensitive", true);
@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
word.set(line);
context.write(word, NullWritable.get());
public static class ForwardReducer extends Reducer<Text, NullWritable, Text, NullWritable>
private NullWritable result = NullWritable.get();
public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException
context.write(key, result);
public static void main(String[] args) throws Exception
Configuration conf = new Configuration();
GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();
// Job job = Job.getInstance(conf, "word sort");
Job job = new Job(conf, "word sort");
job.setJarByClass(AmaSort.class);
job.setMapperClass(LineMapper.class);
// job.setCombinerClass(ForwardReducer.class);
job.setReducerClass(ForwardReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
【讨论】:
谢谢@prashant 它有效但是有很多数据被删除了(我的原始数据集大小为 36MB,排序后的数据只有 3.6Mb),我在文件 0、tcp、xxxxx 的第一个中获得了数据。 0,1,111 然后是 'At' 属性....最后一个 'At' 数据我想要按这个顺序 'At'attribute 然后一行 'At' 日期最后剩下的数据 你能帮我解决这个问题吗?或者如果你想给我发你的电子邮件来互相联系 当我们使用键进行排序时,默认情况下它将使用RowComparator
对键进行排序,但如果您想使用自定义排序对它们进行排序,您可以实现自己的SortComparator
请有看看链接,***.com/questions/16184745/… 顺便说一下,你可以通过 prashant.n.khunt@gmail.com 联系我
您能否也举一些从原始数据中删除的行的示例?并且大小可能会减少,因为从数据中删除了重复项。
谢谢你的兄弟,我现在给你发邮件,希望我能尽快收到你的回放以上是关于排序数据 Hadoop Mapreduce的主要内容,如果未能解决你的问题,请参考以下文章
大数据之Hadoop(MapReduce):WritableComparable排序案例实操(全排序)