MapReduce实现PageRank算法(邻接矩阵法)

Posted zyb993963526

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce实现PageRank算法(邻接矩阵法)相关的知识,希望对你有一定的参考价值。

前言

之前写过稀疏图的实现方法,这次写用矩阵存储数据的算法实现,只要会矩阵相乘的话,实现这个就很简单了。如果有不懂的可以先看一下下面两篇随笔。

MapReduce实现PageRank算法(稀疏图法)

Python+MapReduce实现矩阵相乘

 

算法实现

我们需要输入两个矩阵A和B,我一开始想的是两个矩阵分别存在两个文件里然后分别读取,但是我发现好像不行,无法区分打上A、B的标签。

所以我一开始就把A、B矩阵合起来存在一个文件里,一次读取。 

mapper.py

 1 #!/usr/bin/env python3
 2 import os
 3 import sys
 4 
 5 flag = 0       # 0表示处理A矩阵,1表示处理B矩阵
 6 current_row = 1  # 记录现在处理矩阵的第几行
 7 
 8 
 9 def read_input():
10     for lines in sys.stdin:
11         yield lines
12 
13 
14 if __name__ == __main__:
15     row_a = int(os.environ.get(row_a))
16     col_a = int(os.environ.get(col_a))
17     row_b = int(os.environ.get(row_b))
18     col_b = int(os.environ.get(col_b))
19     for line in read_input():
20         if line.count(
) == len(line):    # 去空行
21             pass
22         data = line.strip().split(	)
23 
24         if flag == 0:
25             for i in range(col_b):
26                 for j in range(col_a):
27                     print("%s,%s	A:%s,%s" % (current_row, i+1, j+1, data[j]))
28             current_row += 1
29             if current_row > row_a:
30                 flag = 1
31                 current_row = 1
32 
33         elif flag == 1:
34             for i in range(row_a):
35                 for j in range(col_b):
36                     print("%s,%s	B:%s,%s" % (i+1, j+1, current_row, data[j]))
37             current_row += 1

reducer.py

 1 #!/usr/bin/env python3
 2 import os
 3 import sys
 4 from itertools import groupby
 5 from operator import itemgetter
 6 
 7 
 8 def read_input(splitstr):
 9     for line in sys.stdin:
10         line = line.strip()
11         if len(line) == 0:
12             continue
13         yield line.split(splitstr)
14 
15 
16 if __name__ == __main__:
17     alpha = float(os.environ.get(alpha))
18     row_b = int(os.environ.get(row_b))
19 
20     data = read_input(	)
21     lstg = (groupby(data, itemgetter(0)))
22     try:
23         for flag, group in lstg:
24             matrix_a, matrix_b = {}, {}
25             total = 0.0
26             for element, g in group:
27                 matrix = g.split(:)[0]
28                 pos = g.split(:)[1].split(,)[0]
29                 value = g.split(,)[1]
30                 if matrix == A:
31                     matrix_a[pos] = value
32                 else:
33                     matrix_b[pos] = value
34             for key in matrix_a:
35                 total += float(matrix_a[key]) * float(matrix_b[key])
36             page_rank = alpha * total + (1.0 - alpha) / row_b
37             print("%s" % page_rank)
38     except Exception:
39         pass

 

算法运行

由于每次迭代会产生新的值,又因为我无法分两个文件读取,所以每次迭代后我要将网络矩阵和新的pageRank值合起来再上传至HDFS。

下面的Python代码的功能就是合并和记录迭代值。

 1 #!/usr/bin/env python3
 2 import sys
 3 
 4 
 5 number = sys.stdin.readline().strip()
 6 
 7 f_src = open("tmp.txt","r")
 8 f_dst1 = open("result.txt", "a")
 9 f_dst2 = open("B.txt", "a")
10 
11 
12 mat = "{:^30}	"
13 f_dst1.write(
 + number)
14 
15 lines = f_src.readlines()
16 for line in lines:
17     if line.count(
) == len(line):
18         continue
19     line = line.strip()
20     f_dst1.write(mat.format(line))
21     f_dst2.write(line)
22     f_dst2.write(
)

 再贴一下运行脚本run.sh

 1 #!/bin/bash
 2 
 3 pos="/usr/local/hadoop"
 4 max=10
 5 
 6 for i in `seq 1 $max`
 7 do
 8 
 9     $pos/bin/hadoop jar $pos/hadoop-streaming-2.9.2.jar 10     -mapper $pos/mapper.py 11     -file $pos/mapper.py 12     -reducer $pos/reducer.py 13     -file $pos/reducer.py 14     -input B.txt 15     -output out 16     -cmdenv "row_a=4" 17     -cmdenv "col_a=4" 18     -cmdenv "row_b=4" 19     -cmdenv "col_b=1" 20     -cmdenv "alpha=0.8" 21 
22     
23     rm -r ~/Desktop/B.txt
24     cp ~/Desktop/A.txt ~/Desktop/B.txt
25     rm -r ~/Desktop/tmp.txt
26     $pos/bin/hadoop fs -get out/part-00000 ~/Desktop/tmp.txt
27     echo $i | ~/Desktop/slove.py
28
29     $pos/bin/hadoop fs -rm B.txt
30     $pos/bin/hadoop fs -rm -r -f out
31     $pos/bin/hadoop fs -put ~/Desktop/B.txt B.txt
32 done

 我这里就随便迭代了10次:

技术图片

我感觉mapreduce用来处理迭代计算实在是太麻烦了,所以才会有twister、haloop、spark这些开源框架的出现吧。

以上是关于MapReduce实现PageRank算法(邻接矩阵法)的主要内容,如果未能解决你的问题,请参考以下文章

PageRank算法的MapReduce实现(输入有向图,迭代收敛)

PageRank算法的MapReduce实现(输入有向图,迭代收敛)

PageRank算法的MapReduce实现(输入有向图,迭代收敛)

大数据-Hadoop2.7实现PageRank算法-MapReduce&HDFS

大数据-Hadoop2.7实现PageRank算法-MapReduce&HDFS

MapReduce 之PageRank 算法概述设计思路和源码分析