MapReduce 之PageRank 算法概述设计思路和源码分析

Posted 行路南

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce 之PageRank 算法概述设计思路和源码分析相关的知识,希望对你有一定的参考价值。

早就对PageRank 算法感兴趣,但一直都是轮廓性的概念,没有具体深入学习。最近要学习和总结MapReduce 的实例,就又把PageRank 算法重新学习了一遍,并基于MapReduce 进行了实现。

1. PageRank是什么

PageRank,网页排名,右脚网页级别。是以Google 公司创始人Larry Page 之姓来命名。PageRank 计算每一个网页的PageRank值,并根据PageRank值的大小对网页的重要性进行排序。PageRank的基本思想是:对于一个网页A来说,链接到A的页面越多,且链接到A的页面的PageRank值越大,网页A的PageRank值越大。

2. 简单的PageRank计算

首先我们对整个Web进行一个简单化处理:1、将每一个页面看做一个节点;2、如果页面A 链接页面B,则存在一条从A到B的有向边。整个Web就被抽象成一个有向图。
现在假设只有四张网页,它的结构如图所示。

这里写图片描述

显然该图是强联通的(从任一节点出发都可以达到另一任节点)。对于页面A来说,它链接到页面B,C,D,即A有3个出链,则它跳转到每个出链B,C,D的概率均为1/3.。如果A有k个出链,跳转到每个出链的概率为1/k。同理B到A,C,D的概率为1/2,0,1/2。C到A,B,D的概率为1,0,0。D到A,B,C的概率为0,1/2,1/2。

通常我们使用一种合适的数据结构来表示页面间的链接关系。设一共有N个页面,则要生成一个N维矩阵,其中第i行表示的是其他页面对 第i个页面链接的概率,第j列表示的是第j个页面对其他页面链接的概率。这样的矩阵叫做转移矩阵。对应到上图,转移矩阵为:

这里写图片描述

在上图中,第一列为页面A对各个页面转移的概率,第一行为各个页面对页面A转移的概率。初始时,每一个页面的PageRank值都是均等的,为1/N,这里也即是1/4。然后对于页面A来说,根据每一个页面的PageRank值和每个页面对页面A的转移概率,可以算出新一轮页面A的PageRank值。这里,只有页面B和页面C转移了自己的1/2给A。所以新一轮A的PageRank值为1/4*1/2+1/4*1/2=9/24。

为了计算方便,我们设置各页面初始的PageRank值为一个列向量V0。然后再基于转移矩阵,我们可以直接求出新一轮各个页面的PageRank值。即 V1 = MV0

这里写图片描述

现在得到了各页面新的PageRank值V1, 继续用M 去乘以V1 ,就会得到更新的PageRank值。一直迭代这个过程,可以证明出V最终会收敛。此时停止迭代。这时的V就是各个页面的PageRank值。在上图中,一直迭代的中间V如下:

这里写图片描述

3. 处理Dead Ends(终止点)

上面的PageRank计算方法要求整个Web是强联通的。而实际上真实的Web并不是强联通的,有一类页面,它们不存在任何外链,称之为Dead Ends(终止点)。比如下面的图:

这里写图片描述

这里节点C即是一个终止点。而上面的算法之所以能够成功收敛,很大因素上基于转移矩阵每一列加和为1(每一个页面都至少有一个出链)。当节点C没有出链时,转移矩阵M如下所示

这里写图片描述

基于这个转移矩阵和初始的PageRank列向量,每一次迭代过的PageRank列向量如下:

这里写图片描述

解决该问题的一种方法是:迭代拿掉图中的Dead Ends 点以及相关的边,之所以是迭代拿掉,是因为当拿掉最初的Dead Ends之后,又可能产生新的Dead Ends点。直到图中没有Dead Ends点为止。然后对剩余的所有节点,计算它们的PageRank ,然后以拿掉Dead Ends的逆序反推各个Dead Ends的PageRank值。

比如在上图中,首先拿掉节点C,然后发现没有产生新的Dead Ends .然后对A,B,D 计算他们的PageRank ,他们初始PageRank值均为1/3,且A有两个出链,B有两个出链,D有一个出链,那么由上面的方法可以算出各页面最终的PageRank值。假设算出A的PageRank 为x, B的PageRank 为y, D的PageRank 为z, 那么C的PageRank值为1/3*x + 1/2*z 。

4. 处理Spider Traps(蜘蛛陷阱)

通过以上可以想象到,真实的Web链接关系若是转换成转移矩阵,那必将是一个稀疏的矩阵。而稀疏的矩阵迭代相乘会使得中间产生的PageRank向量变得不平滑(一小部分值很大,大部分值很小或接近于0)。而一种Spider Traps 节点 会加剧这个不平滑的效果,也即是蜘蛛陷阱。它是指某一些节点虽然有外链,但是它只链向自己。如下图所示:

这里写图片描述

如果对这个图按照上面的方法进行迭代计算PageRank , 计算后会发现所有节点的PageRank值都会逐步转移到节点C上来,而其他节点都趋近于零。

这里写图片描述

为了解决这个问题,我们需要对PageRank 计算方法进行一个平滑处理–加入teleporting(跳转因子)。也就是说,用户在访问Web页面时,除了按照Web页面的链接关系进行选择以外,他也可能直接在地址栏上输入一个地址进行访问。这样就避免了用户只在一个页面只能进行自身访问,或者进入一个页面无法出来的情况。

加入跳转因子之后,PageRank向量的计算公式修正为:

这里写图片描述

其中, β 通常设置为一个很小的数(0.2或者0.15),e为单位向量,N是所有页面的个数,乘以1/N是因为随机跳转到一个页面的概率是1/N。这样,每次计算PageRank值,既依赖于转移矩阵,同时依赖于小概率的随机跳转。

以上图为例,改进后的PageRank值计算如下:

这里写图片描述

按照这个计算公式迭代下去,会发现spider traps 效应被抑制了,使得各个页面得到一个合理的PageRank值。

5. 基于MapReduce 的PageRank 设计思路

基于上面的例子,我们的输入样本是

这里写图片描述

其中每一行中第一列为网页i,第二列为该网页的pagerank 值,之后的列均为网页i 链接的网页。
因为我们要迭代的计算PageRank值,那么每次MapReduce 的输出要和输入的格式是一样的,这样才能使得Mapreduce 的输出用来作为下一轮MapReduce 的输入。

那么我们每次得到的输出,应该是这样的:

这里写图片描述

5.1 Map过程的设计

  • 对每一行文本进行解析,获得当前网页、当前网页的PageRank值、当前网页要链接到的其他网页
  • 计算出要链接到的其他网友的个数,然后求出当前网页对其他网页的贡献值。
  • 输出设计时,要输出两种:
    • 第一种输出的< key ,value>中的key 表示其他网页,value 表示当前网页对其他网页的贡献值。
    • 第二种输出的< key ,value>中的key 表示当前网页,value 表示所有其他网页。
    • 并且,为了区别这两种输出,第一种输出的value里加入“@”,第二种输出的value里加入“&”

经过Map过程之后,会得到如下结果:

这里写图片描述

Map结果输出之后,经过Shuffle过程排序并合并(系统自动实现),结果 如下:

这里写图片描述

shuffle结果可知,shuffle 过程的输出key 表示一个网页,输出value表示一个列表,里面有两类:一类是从其他网页获得的贡献值,一类是该网页的所有出链网页

5.2 Reduce过程的设计

  • shuffule的输出也即是reduce的输入。
  • reduce输入的key 直接作为输出的key
  • 对reduce输入的value 进行解析,它是一个列表,
    • 若列表里的值里包含“@”,就把该值“@”后面的字符串转化成float型加起来
    • 若列表里的值里包含“&”,就把该值“&”后面的字符串提取出来
    • 把所有贡献值的加总,和提取的字符串进行连接,作为reduce的输出value

6.源码分析

下面看一下PankRank 实现的代码


package org.apache.hadoop.examples;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class PageRank_fourth {

    /*map过程*/
    public static class lxnmapper extends Mapper<Object,Text,Text,Text>{        
        private String id;
        private float pr;       
        private int count;
        private float average_pr;       
        public void map(Object key,Text value,Context context)
            throws IOException,InterruptedException{            
            StringTokenizer str = new StringTokenizer(value.toString());//对value进行解析
            id =str.nextToken();//id为解析的第一个词,代表当前网页
            pr = Float.parseFloat(str.nextToken());//pr为解析的第二个词,转换为float类型,代表PageRank值
            count = str.countTokens();//count为剩余词的个数,代表当前网页的出链网页个数
            average_pr = pr/count;//求出当前网页对出链网页的贡献值
            String linkids ="&";//下面是输出的两类,分别有'@'和'&'区分
            while(str.hasMoreTokens()){
                String linkid = str.nextToken();
                context.write(new Text(linkid),new Text("@"+average_pr));//输出的是<出链网页,获得的贡献值>
                linkids +=" "+ linkid;
            }       
            context.write(new Text(id), new Text(linkids));//输出的是<当前网页,所有出链网页>
        }       
    }

    /*reduce过程*/
    public static class lxnreduce extends Reducer<Text,Text,Text,Text>{     
        public void reduce(Text key,Iterable<Text> values,Context context)
            throws IOException,InterruptedException{            
            String lianjie = "";
            float pr = 0;
            /*对values中的每一个val进行分析,通过其第一个字符是'@'还是'&'进行判断
            通过这个循环,可以 求出当前网页获得的贡献值之和,也即是新的PageRank值;同时求出当前
            网页的所有出链网页 */
            for(Text val:values){
                if(val.toString().substring(0,1).equals("@")){
                    pr += Float.parseFloat(val.toString().substring(1));
                }
                else if(val.toString().substring(0,1).equals("&")){
                    lianjie += val.toString().substring(1);
                }
            }

            pr = 0.8f*pr + 0.2f*0.25f;//加入跳转因子,进行平滑处理           
            String result = pr+lianjie;
            context.write(key, new Text(result));           
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        conf.set("mapred.job.tracker", "172.16.10.15:9001");
        String pathIn1 = "hdfs://172.16.10.15:9000/user/hadoop/pagerank_fourthinput";
        String pathOut="hdfs://172.16.10.15:9000/user/hadoop/pagerank_fourthoutput";

        Job job = new Job(conf,"page rank");
        job.setJarByClass(PageRank_fourth.class);
        job.setMapperClass(lxnmapper.class);
        job.setReducerClass(lxnreduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(pathIn1));
        FileOutputFormat.setOutputPath(job, new Path(pathOut));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

现在这样,能够完成一次迭代,但PageRank值通常需要迭代30-40次才能达到收敛,所以需要对程序进行一点改变,使之能够进行自动的迭代操作。
只需要对主函数main 进行改变即可。下面贴出改变后的main函数

public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        conf.set("mapred.job.tracker", "172.16.10.15:9001");
        String pathIn1 = "hdfs://172.16.10.15:9000/user/hadoop/pagerank_fourthinput";
        String pathOut="hdfs://172.16.10.15:9000/user/hadoop/pagerank_fourthoutput0";
        for(int i=1;i<41;i++){      //加入for循环
            Job job = new Job(conf,"page rank");
            job.setJarByClass(PageRank_fourth.class);
            job.setMapperClass(lxnmapper.class);
            job.setReducerClass(lxnreduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            FileInputFormat.addInputPath(job, new Path(pathIn1));
            FileOutputFormat.setOutputPath(job, new Path(pathOut));
            pathIn1 = pathOut;//把输出的地址改成下一次迭代的输入地址
            pathOut = pathOut+i;//把下一次的输出设置成一个新地址。
            job.waitForCompletion(true);//把System.exit()去掉
        }
    }

下面是迭代40次后的输出结果:

这里写图片描述

参考文献

  1. 张洋:浅析PageRank算法 http://blog.jobbole.com/23286/
  2. PageRank算法 http://blog.csdn.net/magicnumber/article/details/43853547
  3. hadoop上的pageRank算法 http://www.cnblogs.com/dandingyy/archive/2013/03/08/2950740.html
  4. PageRank算法简介及Map-Reduce实现 http://www.cnblogs.com/fengfenggirl/p/pagerank-introduction.html

以上是关于MapReduce 之PageRank 算法概述设计思路和源码分析的主要内容,如果未能解决你的问题,请参考以下文章

PageRank算法的MapReduce实现(输入有向图,迭代收敛)

PageRank算法的MapReduce实现(输入有向图,迭代收敛)

PageRank算法的MapReduce实现(输入有向图,迭代收敛)

MapReduce实现PageRank算法(邻接矩阵法)

大数据-Hadoop2.7实现PageRank算法-MapReduce&HDFS

大数据-Hadoop2.7实现PageRank算法-MapReduce&HDFS