PageRank算法的MapReduce实现(输入有向图,迭代收敛)

Posted As_zyh

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PageRank算法的MapReduce实现(输入有向图,迭代收敛)相关的知识,希望对你有一定的参考价值。

输入为网页有向图的邻接表:



通过统计输入文件的行数,即可得之网页总数为4

每个网页的初值为1/N,即0.25

第一行输入经过map处理后,得到如下结果:

B 0.0833
C 0.0833
D 0.0833

同理,第二三四行经过map处理后,得到:

A 0.125
D 0.125

C 0.25

B 0.125
C 0.125




系统会自动对map的输出进行shuflle处理,即对key进行排序,将相同key的value合并成一个列表。

A 0.125
B 0.0833 0.125
C 0.0833 0.25 0.125
D 0.0833 0.125 0.125




此时出现一个疑问:

为什么要进行这一步,而不是直接将相同key的value进行加和呢?



是为了MapReduce编程的可扩展性,在已知PageRank任务的前提下,我们知道要对相同key的value进行加和,如果是求最大值的任务呢?

所以把对value列表的操作交给reduce,我们要怎么操作这些列表,只要对reduce进行编写即可。

为解决网页间的终止点问题和陷阱问题,需要在reduce中进行如下处理(网页没有出链或者出链只有自己,pr值迭代后只增不减)

假设:上网者通过出链访问其他网页的概率为a,通过地址栏随机访问页面的概率为(1-a)

所以,在reduce过程,某网页pr变换为:

a *(接收其他网页发送来的pr值) + (1-a) * 1/N



经过reduce处理后,网页的pr值为
A = 0.8 * 0.125 + 0.2 * 0.25 = 0.15
B = 0.8 * (0.0833 + 0.125) + 0.2 * 0.25 = 0.216
C = 0.8 * (0.0833 + 0.25) + 0.2 * 0.25 = 0.416
D = 0.8 * (0.0833 + 0.125 + 0.125) + 0.2 * 0.25 = 0.216

此时一轮迭代结束,将reduce的结果输出




那么何时停止迭代呢?

要么到达最大迭代次数,要么pr值的变化已经收敛(pr值的曲线图趋于水平)

如何判断pr值收敛:

设置一个参数epi,若 max | Pi j - P i j-1| < epi ,则说明pr值的变化已经收敛。




完整的程序如下:(支持eclipse Run on Hadoop,不支持yarn -jar运行,因为yarn -jar运行时,只能访问类中static变量的初始值,若在程序运行时对static变量的值进行更改,则map/reduce中得到的变量值还是旧值)

package test02;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PageRank_02 

	private static int N = 1;
	private static float a = 0.8f;
	private static int maxIteration = 40;
	private static float epi = 0.000001f;
	private static HashMap<String, Float> map;
	private static HashMap<String, Float> old_map;

	public static void main(String[] args) throws Exception 
		Configuration conf = new Configuration();
		String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
		if (otherArgs.length < 2) 
			System.err.println("Usage: <in> <out> [max_iteration] [epi]");
			System.err.println(" max_iteration -- Integer, default 40");
			System.err.println("     epi       -- Float,   default 0.000001f");
			System.exit(2);
		

		String input = "";
		if (otherArgs.length > 0)
			input = otherArgs[0];

		String output = "";
		if (otherArgs.length > 1)
			output = otherArgs[1];

		if (otherArgs.length > 2)
			setMaxIteration(Integer.parseInt(otherArgs[2]));

		if (otherArgs.length > 3)
			setEpi(Float.parseFloat(otherArgs[3]));

		// 统计input文件行数,即网页个数
		FileSystem fs = FileSystem.get(conf);
		FSDataInputStream in = fs.open(new Path(input));

		BufferedReader d = new BufferedReader(new InputStreamReader(in));
		int count = 0;
		String line;
		while ((line = d.readLine()) != null) 
			count += 1;
		
		System.err.println("Numbers of pages: " + count);
		setN(count);
		d.close();
		in.close();

		for (int i = 0; i < getMaxIteration(); i++) 
			map = new HashMap<String, Float>();
			Job job = Job.getInstance(conf, "page rank");
			job.setJarByClass(PageRank_02.class);
			job.setMapperClass(PageRankMapper.class);
			job.setReducerClass(PageRankReduce.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			FileInputFormat.addInputPath(job, new Path(input));
			String new_output = output + (i + 1);// 把下一次的输出设置成一个新地址。
			// 输出路径存在,则删除
			Path new_output_path = new Path(new_output);
			if (fs.exists(new_output_path)) 
				fs.delete(new_output_path, true);
			
			FileOutputFormat.setOutputPath(job, new_output_path);
			job.waitForCompletion(true);

			float max_delta = -1.0f;
			if (i > 0) 
				for (String key : map.keySet()) 
					max_delta = Math.max(max_delta, Math.abs(map.get(key) - old_map.get(key)));
				
			
			System.err.println("iteration: " + i + " , MaxIteration: " + getMaxIteration());
			System.err.println("N: " + getN());
			System.err.println("a: " + getA());
			System.err.println("max_delta: " + max_delta);
			System.err.println("epi: " + getEpi());

			if (max_delta < epi && i > 0)
				break;

			old_map = map;
		
		System.exit(0);
	

	/* map过程 */
	public static class PageRankMapper extends Mapper<Object, Text, Text, Text> 
		private String id;
		private float pr;
		private int count;
		private float average_pr;

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
			StringTokenizer str = new StringTokenizer(value.toString());// 对value进行解析
			id = str.nextToken();// id为解析的第一个词,代表当前网页
			if(old_map!=null && old_map.containsKey(id)) 
				pr = old_map.get(id);
			 else 
				pr = 1.0f / N;
			
			
			count = str.countTokens();// count为剩余词的个数,代表当前网页的出链网页个数
			average_pr = pr / count;// 求出当前网页对出链网页的贡献值
			while (str.hasMoreTokens()) 
				String linkid = str.nextToken();
				context.write(new Text(linkid), new Text(average_pr + ""));// 输出的是<出链网页,获得的贡献值>
			
		
	

	/* reduce过程 */
	public static class PageRankReduce extends Reducer<Text, Text, Text, Text> 
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
			float pr = 0;
			for (Text val : values) 
				pr += Float.parseFloat(val.toString());
			
			pr = getA() * pr + (1 - getA()) * (1.0f / getN());// 加入跳转因子
			map.put(key.toString(), pr);
			context.write(key, new Text(pr + ""));
		
	

	public static float getEpi() 
		return epi;
	

	public static void setEpi(float epi) 
		PageRank_02.epi = epi;
	

	public static float getA() 
		return a;
	

	public static void setA(float a) 
		PageRank_02.a = a;
	

	public static int getMaxIteration() 
		return maxIteration;
	

	public static void setMaxIteration(int maxIteration) 
		PageRank_02.maxIteration = maxIteration;
	

	public static int getN() 
		return N;
	

	public static void setN(int n) 
		PageRank_02.N = n;
	




程序输入参数分别为:输入文件 输出文件 Max_iteration epi

Run Configurations设置如下
按照如图配置运行程序

在iteration: 14时,程序退出循环

pr值变化的最大值:
max_delta = 0.0000846

设置的参数epi:
epi = 0.0001

max_delta < epi
即pr值已收敛

参考文献
1.MapReduce 之PageRank算法概述、设计思路和源码分析https://blog.csdn.net/u010414589/article/details/51404971

以上是关于PageRank算法的MapReduce实现(输入有向图,迭代收敛)的主要内容,如果未能解决你的问题,请参考以下文章

PageRank算法的MapReduce实现(输入有向图,迭代收敛)

PageRank算法的MapReduce实现(输入有向图,迭代收敛)

PageRank算法的MapReduce实现(输入有向图,迭代收敛)

大数据-Hadoop2.7实现PageRank算法-MapReduce&HDFS

大数据-Hadoop2.7实现PageRank算法-MapReduce&HDFS

MapReduce 之PageRank 算法概述设计思路和源码分析