使用 Spark GraphX 实现 PageRank 算法

Posted Shockang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用 Spark GraphX 实现 PageRank 算法相关的知识,希望对你有一定的参考价值。

前言

本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系


关联

Spark RDD 论文详解(三)Spark 编程接口

正文

简介

GraphX 提供了静态和动态 PageRank 的实现方法,这些方法在 PageRank 对象中。

静态的 PageRank 运行固定次数的迭代,而动态的 PageRank 一直运行直到收敛为止。

数据

GraphX 源码中提供了一个运用 PageRank 算法分析社交网络中各用户重要性的案例。

社交网络中的用户数据在 data/graphx/users.txt 中,用户之间关系数据在 data/graphx/followers.txt 中。

users . txt 内容如下:

1,BarackObama,Barack Obama
2,ladygaga,Goddess of Love
3,jeresig,John Resig
4,justinbieber,Justin Bieber
6,matei_zaharia,Matei Zaharia
7,odersky,Martin Odersky
8,anonsys

followers.txt 内容如下

2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7

首先以 users.txt 中的用户作为顶点、 followers.txt 中的关系作为边集创建图

然后通过图直接调用 PageRank 算法计算出每个顶点的 PR 值,即用户的重要性

最后结合用户的属性信息对结果输出展示。

代码

package com.shockang.study.spark.graphx


import org.apache.log4j.Level, Logger
import org.apache.spark.graphx.GraphLoader
import org.apache.spark.sql.SparkSession

/**
 * 基于社交网络数据集的 PageRank 示例
 */
object PageRankExample 
  val FOLLOWERS_PATH = "/Users/shockang/code/spark-examples/data/simple/graphx/followers.txt"
  val USERS_PATH = "/Users/shockang/code/spark-examples/data/simple/graphx/users.txt"

  def main(args: Array[String]): Unit = 
    // 关闭 Spark 内部的日志打印,只关注结果日志
    Logger.getLogger("org").setLevel(Level.OFF)
    // 创建 SparkSession
    val spark = SparkSession
      .builder
      .appName("PageRankExample")
      .master("local[*]")
      .getOrCreate()
    val sc = spark.sparkContext

    // 加载边作为图
    val graph = GraphLoader.edgeListFile(sc, FOLLOWERS_PATH)
    // 运行 PageRank
    val ranks = graph.pageRank(0.0001).vertices
    // Join ranks with the usernames
    val users = sc.textFile(USERS_PATH).map  line =>
      val fields = line.split(",")
      (fields(0).toLong, fields(1))
    
    val ranksByUsername = users.join(ranks).map 
      case (id, (username, rank)) => (username, rank)
    
    // 打印结果
    println(ranksByUsername.collect().mkString("\\n"))
    spark.stop()
  

输出

(justinbieber,0.15007622780470478)
(matei_zaharia,0.7017164142469724)
(ladygaga,1.3907556008752426)
(BarackObama,1.4596227918476916)
(jeresig,0.9998520559494657)
(odersky,1.2979769092759237)

以上是关于使用 Spark GraphX 实现 PageRank 算法的主要内容,如果未能解决你的问题,请参考以下文章

Spark图处理GraphX学习笔记!

Spark GraphX图计算代码实现,源码分析

Spark中文手册8:spark GraphX编程指南

GraphX 实现K-Core

在 Spark GraphX 中实现拓扑排序

Spark GraphX图计算核心算子实战AggreagteMessage