分布式与并行计算课程设计(附流程分析图)

Posted @阿证1024

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式与并行计算课程设计(附流程分析图)相关的知识,希望对你有一定的参考价值。

课题名称:倒排序索引的实现

一、项目

倒排序索引的实现。

二、要求

数据源:a.txt,b.txt,c.txt

请将数据源中的文本按照倒排序的规则统计每个单词在不同文本中出现的次数。最终要求得到类似以下的结果:

hello a.txt8,b.txt3,c.txt1
tom a.txt5,b.txt2,c.txt1
…………………….

流程分析图:

设置两个Job来实现要求,第一个Job主要是统计出 “单词—所在文件名” 的数量,第二个Job依据Job1统计出来的结果的基础上再进行统计,进而得出倒排索引的结果,详细过程见下图:

三、代码及运行结果

类结构:

Job1:OneIndexMapper、OneIndexReducer、OneIndexDriver
Job2:TwoIndexMapper、TwoIndexReducer、TwoIndexDriver

第一个Job:

OneIndexDriver:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OneIndexDriver 
	public static void main(String[] args) throws Exception 

		args = new String[]  "写自己的目录", "写自己的目录" ;

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(OneIndexDriver.class);

		job.setMapperClass(OneIndexMapper.class);
		job.setReducerClass(OneIndexReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	

OneIndexMapper:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class OneIndexMapper extends Mapper<LongWritable, Text, Text, IntWritable> 
	
	String name;
	Text k = new Text();
	IntWritable v = new IntWritable();
	
	@Override
	protected void setup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws IOException, InterruptedException 
		FileSplit split = (FileSplit) context.getInputSplit();
		name = split.getPath().getName();
	
	
	protected void map(LongWritable key, Text value,
			org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, IntWritable>.Context context)
			throws java.io.IOException, InterruptedException 
		// 1. 获取一行数据
		String line = value.toString();
		
		// 2. 切割
		String[] fields = line.split(" ");
		
		for(String word : fields) 
			// 3. 拼接
			k.set(word + "--" + name);
			v.set(1);
			// 4. 写出
			context.write(k, v);
		
	;

OneIndexReducer:

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class OneIndexReducer extends Reducer<Text, IntWritable, Text, IntWritable>
	
	IntWritable v = new IntWritable();
	
	@Override
	protected void reduce(Text k, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException 
		int sum = 0;
		// 1. 遍历values
		for(IntWritable ele : values) 
			sum += ele.get();
		
		
		v.set(sum);
		// 2. 写出
		context.write(k, v);
	

第二个Job:

TwoIndexDirver:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class TwoIndexDriver 
	public static void main(String[] args) throws Exception 

		args = new String[]  "写自己的目录", "写自己的目录" ;

		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);

		job.setJarByClass(TwoIndexDriver.class);

		job.setMapperClass(TwoIndexMapper.class);
		job.setReducerClass(TwoIndexReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		job.waitForCompletion(true);
	

TwoIndexMapper:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TwoIndexMapper extends Mapper<LongWritable, Text, Text, Text>
	
	Text k = new Text();
	Text v = new Text();
	
	@Override
	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
			throws IOException, InterruptedException 
		// 1. 获取数据
		String line = value.toString();
		
		// 2. 切割
		String[] fields = line.split("--");
		
		String vStr = fields[1].replace("\\t", "-->");
		
		// 3. 封装对象
		k.set(fields[0]);
		v.set(vStr);
		
		// 4. 写出
		context.write(k, v);
	

TwoIndexReducer:

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TwoIndexReducer extends Reducer<Text, Text, Text, Text>
	
	Text v = new Text();
	
	@Override
	protected void reduce(Text k, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
			throws IOException, InterruptedException 
		String vStr = "";
		
		for(Text s : values) 
			vStr = vStr + s.toString() + " ";
		
		
		v.set(vStr);
		
		context.write(k, v);
	

四、结果展示

Job1运行结果:



Job2运行截图:


以上是关于分布式与并行计算课程设计(附流程分析图)的主要内容,如果未能解决你的问题,请参考以下文章

TensorFlow分布式计算机制解读:以数据并行为重

大数据专业学啥?

干货附代码|大数据分析语言DolphinDB脚本语言概述

软件自动化测试基本流程(附流程图)

基于Python班级管理系统毕业设计-附源码171809

Spark与Flink:对比与分析