Spark ALS recommendForAll源码解析实战之Spark1.x vs Spark2.x

Posted fansy1990

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark ALS recommendForAll源码解析实战之Spark1.x vs Spark2.x相关的知识,希望对你有一定的参考价值。

文章目录

Spark ALS recommendForAll源码解析实战

1. 软件版本:

软件版本
Spark1.6.3 、 2.2.2
Hadoop2.6.5

2. 本文要解决的问题

  1. 分析Spark2.2.2中 ALS算法的recommendForAll函数的实现思路;
  1. 分析Spark1.6.3中 ALS算法的recommendForAll函数的实现思路;
  1. Spark2.2.2 和 Spark 1.6.3 关于 ALS的recommendForAll的实现的性能对比(参考Spark2.2.2. VS Spark1.6.3 之ALS 推荐性能对比);

3. 源码分析实战

源码分析实战采用源码分析+实例演示的方式来阐明源码的实现思路,下面分别给出Spark2.2.2以及Spark1.6.3的源码中关于ALS算法的recommendForAll的实战分析。

3.1 Spark2.2.2 ALS recommendForAll 实战分析

1. 首先给出其核心实现源码:

private def recommendForAll(
      rank: Int,
      srcFeatures: RDD[(Int, Array[Double])],
      dstFeatures: RDD[(Int, Array[Double])],
      num: Int): RDD[(Int, Array[(Int, Double)])] = 
    val srcBlocks = blockify(srcFeatures)
    val dstBlocks = blockify(dstFeatures)
    val ratings = srcBlocks.cartesian(dstBlocks).flatMap  case (srcIter, dstIter) =>
      val m = srcIter.size
      val n = math.min(dstIter.size, num)
      val output = new Array[(Int, (Int, Double))](m * n)
      var i = 0
      val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
      srcIter.foreach  case (srcId, srcFactor) =>
        dstIter.foreach  case (dstId, dstFactor) =>
          // We use F2jBLAS which is faster than a call to native BLAS for vector dot product
          val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
          pq += dstId -> score
        
        pq.foreach  case (dstId, score) =>
          output(i) = (srcId, (dstId, score))
          i += 1
        
        pq.clear()
      
      output.toSeq
    
    ratings.topByKey(num)(Ordering.by(_._2))
  
private def blockify(
      features: RDD[(Int, Array[Double])],
      blockSize: Int = 4096): RDD[Seq[(Int, Array[Double])]] = 
    features.mapPartitions  iter =>
      iter.grouped(blockSize)
    
  

核心源码包含两个部分:

  • 一个是blockify子函数;
  • 一个是recommendForAll的核心实现;

ALS模型中包含的userFeatures和productFeatures作为此函数的核心输入,分别代表用户向量和物品向量(关于其解释,可以参考ALS算法原理)。

2. blockify函数

blockify函数就是把原RDD进行分块处理,怎么理解分块呢?

假设有如下RDD,uf:

scala> val uf= sc.parallelize(List((1,Array(0.3,0.4,0.6,0.3,0.7)),(2,Array(0.13,0.14,0.16,0.13,0.17)),(3,Array(0.23,0.24,0.26,0.23,0.27)),(4,Array(0.83,0.84,0.86,0.83,0.87)),(5,Array(0.31,0.41,0.61,0.31,0.71)),(6,Array(0.213,0.214,0.216,0.213,0.217)),(7,Array(0.323,0.324,0.326,0.323,0.327)),(8,Array(0.283,0.284,0.286,0.283,0.287)),(9,Array(0.31,0.42,0.63,0.34,0.75)),(10,Array(0.131,0.141,0.161,0.131,0.171)),(11,Array(0.223,0.224,0.226,0.223,0.227)),(12,Array(0.813,0.814,0.816,0.813,0.817))  ))   
uf: org.apache.spark.rdd.RDD[(Int, Array[Double])] = ParallelCollectionRDD[11] at parallelize at <console>:27

那么,使用块大小为5,来对uf进行blockify处理,如下:

scala> val blockSize = 5
blockSize: Int = 5

scala> val ufsrc = uf.mapPartitions  iter =>iter.grouped(blockSize)
ufsrc: org.apache.spark.rdd.RDD[Seq[(Int, Array[Double])]] = MapPartitionsRDD[15] at mapPartitions at <console>:31

而blockify的核心就是针对RDD中的每个分区的数据执行grouped操作,grouped操作就是针对一个列表进行分组,如下:

scala> (0 to 10).grouped(5).toList
res3: List[scala.collection.immutable.IndexedSeq[Int]] = List(Vector(0, 1, 2, 3, 4), Vector(5, 6, 7, 8, 9), Vector(10))

scala> (0 to 10).grouped(6).toList
res4: List[scala.collection.immutable.IndexedSeq[Int]] = List(Vector(0, 1, 2, 3, 4, 5), Vector(6, 7, 8, 9, 10))

所以ufsrcRDD就会在每个分区中构建多条记录,而每个记录就是一个Seq数组,上面的uf数据应用blockify,其数据流如下:

下面对此图进行验证:

1)原始uf RDD数据有2个分区,并且其数据分别为1~6 、 7~12;

scala> uf.glom().collect.foreach(x => println(x.mkString("|")))
(1,[D@673b5922)|(2,[D@5bda90af)|(3,[D@3962064d)|(4,[D@53d95e8a)|(5,[D@6e96ff9a)|(6,[D@61c6450f)
(7,[D@48bf7714)|(8,[D@518b5d87)|(9,[D@8381203)|(10,[D@5b85d036)|(11,[D@68311b85)|(12,[D@635d1461)

2)blockify后的RDD其分区数据每个元素为一个Seq数组,如下:

scala> ufsrc.glom().collect.foreach(x => println(x.mkString("|")))
List((1,[D@299c20bd), (2,[D@256d5200), (3,[D@6e81084f), (4,[D@11a6e7d4), (5,[D@59f7a495))|List((6,[D@164500f9))
List((7,[D@7060b10e), (8,[D@565e7091), (9,[D@3269a5c3), (10,[D@c1539bf), (11,[D@790801f2))|List((12,[D@5c772cba))

下面构造的pf 如下:

scala> val pf = sc.parallelize(List((101,Array(0.33,0.34,0.36,0.33,0.37)),(201,Array(0.43,0.44,0.46,0.43,0.47)),(301,Array(0.53,0.54,0.56,0.53,0.57)),(401,Array(0.303,0.304,0.306,0.303,0.307)),(501,Array(0.403,0.404,0.406,0.403,0.407)),(601,Array(0.153,0.154,0.156,0.153,0.157)),(701,Array(0.523,0.524,0.526,0.523,0.527)),(801,Array(0.553,0.554,0.556,0.553,0.557)) ) )  
pf: org.apache.spark.rdd.RDD[(Int, Array[Double])] = ParallelCollectionRDD[12] at parallelize at <console>:27

scala> val pfdst = pf.mapPartitions  iter =>iter.grouped(blockSize)
pfdst: org.apache.spark.rdd.RDD[Seq[(Int, Array[Double])]] = MapPartitionsRDD[14] at mapPartitions at <console>:31

思考: 其pfdst RDD的数据流情况。

3. cartesian flatMap的优势

代码接下来就是执行 cartesian flatMap,此处关于cartesian flatMap的实现其实有两种方式:

  • 原始数据直接cartesian;

    其代码如下:

    uf.cartesian(pf) 
    
  • 原始数据先blockify,然后cartesian;

下面,先看下这两种方式的异同:

  1. cartesian的操作就是把两个RDD的元素做一个全连接配对,举个简单例子:
scala> val a = sc.parallelize(List(1,2,3,4))
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at <console>:27

scala> val b = sc.parallelize(List(11,22,33))
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at <console>:27

scala> val c = a.cartesian(b)
c: org.apache.spark.rdd.RDD[(Int, Int)] = CartesianRDD[25] at cartesian at <console>:31
scala> c.collect.foreach(println(_))
(1,11)
(2,11)
(1,22)
(1,33)
(2,22)
(2,33)
(3,11)
(4,11)
(3,22)
(3,33)
(4,22)
(4,33)

scala> c.count
res19: Long = 12

scala> a.partitions.size
res20: Int = 2

scala> b.partitions.size
res21: Int = 2

scala> c.partitions.size
res22: Int = 4
  1. 两种方式的cartesian的数据流对比
    1. 直接cartesian:

    1. 先blockify,然后cartesian:

图 blockify-cartesian数据流

通过数据流其实也可以看出,直接cartesian的数据会多些(就单看产生的数据量)。

比如在i.中 用户1的数据(1,Arr())存储要8个;而在ii.中用户1的数据(1,Array())却只会出现2次。

对比其产生的数据大小,代码如下:

scala> uf.cartesian(pf).saveAsTextFile("/tmp/cartesian01")
                                                                         
scala> ufsrc.cartesian(pfdst).saveAsTextFile("/tmp/cartesian02")

执行后,查看其数据存储大小,分别如下图所示。

从两个图的对比也可以发现,第二种方式其数据存储会小很多。这也是为什么源码用的是第二种实现方式。

这里的BlockSize 其实对性能也有很大影响,所以源码中也会建议是否把此参数暴露出来,供用户自己设置。

4. flatMap的处理逻辑

flatMap的处理逻辑比较复杂,下面分点描述:

  1. case (srcIter, dstIter)代表什么数据

参考 图 blockify-cartesian数据流 中的输出数据,(srcIter,dstIter)代表的数据其实就是组对应的数据,例如:

srcIterdstIter
Seq( (1,Arr();…;(5,Arr()))Seq( (101,Arr();(201,Arr());(301,Arr());(401,Arr()) )
Seq( (6,Arr()) )Seq( (101,Arr();(201,Arr());(301,Arr());(401,Arr()) )
Seq((12,Arr()))Seq(501,Arr();(601,Arr());(701,Arr());(801,Arr()))
  1. 规划输出数组大小
val m = srcIter.size
val n = math.min(dstIter.size, num)
val output = new Array[(Int, (Int, Double))](m * n)

output的大小为什么是srcIter.size * math.min(dstIter.size, recNum)?

1) output只代表当前块(block)的用户的推荐内容,所以行个数就应该是当前用户的个数,而srcIter就是用户的一个数组,所以其大小就是行个数;
2) 推荐的个数,当然和用户设置的推荐个数有关,但是其算的当前的项目的块大小如果小于设置的推荐个数,那么最多也只能推荐当前块中项目的个数,所以是math.min ;
  1. 每个用户块(block)和每个项目块(block)乘积并优先队列
var i = 0
val pq = new BoundedPriorityQueue[(Int, Double)](n)(Ordering.by(_._2))
srcIter.foreach  case (srcId, srcFactor) =>
  dstIter.foreach  case (dstId, dstFactor) =>
    // We use F2jBLAS which is faster than a call to native BLAS for vector dot product
    val score = BLAS.f2jBLAS.ddot(rank, srcFactor, 1, dstFactor, 1)
    pq += dstId -> score
  
  pq.foreach  case (dstId, score) =>
    output(i) = (srcId, (dstId, score))
    i += 1
  
  pq.clear()

优先队列指的是一个用户块(比如有5个用户),那么和一个项目块(比如有4个项目),比如现在只推荐3个项目。那么,针对用户块中的每个用户,会和所有的项目进行计算,得到4个项目对应的分数,这些(项目,分数)对就会存入一个优先队列,而这个优先队列在4个(项目,分数)对存入完成后,只会有3个,并且其分数是最大的三个;

  1. 每个用户都会有多个优先队列
val ratings = srcBlocks.cartesian(dstBlocks).flatMap 
    ...
  output.toSeq


首先,每个用户块会对应多个项目块,而每个项目块会对应一个优先队列;接着,这些优先队列会通过flatMap进行合并,得到所有的(用户id,(项目id,分数))这样的数据,也就是RDD,也即是说ratings:RDD[(Int,(Int,Double))]。

思考一下,如果上面的例子中,blockify设置的个数为2,那么srcIter : Seq ((1,Array()), (2,Array()) ) 会对应多少个项目块,对应多少个优先队列?

  1. 合并取top
ratings.topByKey(num)(Ordering.by(_._2))

而这句就是针对ratings数据的每个key分组,然后按照value的第二个值(其实就是分数)排序,取其前n个键值对。

第4,5 步可以通过下图展示:

至此,分析完毕!

3.2 Spark1.6.3 ALS recommendForAll 实战分析

1. blockify函数

Spark1.6.3的blockify函数和Spark2.2.2中实现的blockify函数是不一样的,如下:

/**
   * Blockifies features to use Level-3 BLAS.
   */
  private def blockify(
      rank: Int,
      features: RDD[(Int, Array[Double])]): RDD[(Array[Int], DenseMatrix)] = 
    val blockSize = 4096 // TODO: tune the block size
    val blockStorage = rank * blockSize
    features.mapPartitions  iter =>
      iter.grouped(blockSize).map  grouped =>
        val ids = mutable.ArrayBuilder.make[Int]
        ids.sizeHint(blockSize)
        val factors = mutable.ArrayBuilder.make[Double]
        factors.sizeHint(blockStorage)
        var i = 0
        grouped.foreach  case (id, factor) =>
          ids += id
          factors ++= factor
          i += 1
        
        (ids.result(), new DenseMatrix(rank, i, factors.result()))
      
    
  

在此份代码中,可以分为如下的几个部分:

  1. ArrayBuilder使用
val ids = mutable.ArrayBuilder.make[Int]
ids.sizeHint(blockSize)
val factors = mutable.ArrayBuilder.make[Double]
factors.sizeHint(blockStorage)
ids.result()
factors.result()

这个ArrayBuilder就是一个存储数据的数组,通过sizeHint函数来预设该数组大小,而通过result函数获取整个数组的值,如下:

scala> import scala.collection.mutable
import scala.collection.mutable

scala> val factors = mutable.ArrayBuilder.make[Double]
factors: scala.collection.mutable.ArrayBuilder[Double] = ArrayBuilder.ofDouble

scala> factors ++= Array(0.2,0.3)
res34: factors.type = ArrayBuilder.ofDouble

scala> factors.result()
res35: Array[Double] = Array(0.2, 0.3)

scala> factors ++= Array(0.2,0.3)
res36: factors.type = ArrayBuilder.ofDouble

scala> factors.result()
res37: Array[Double] = Array(0.2, 0.3, 0.2, 0.3)
  1. DenseMatrix使用
    DenseMatrix的使用直接使用其源代码来解释,如下:
/**
   * Column-major dense matrix.
   * The entry values are stored in a single array of doubles with columns listed in sequence.
   * For example, the following matrix
   * 
   *   1.0 2.0
   *   3.0 4.0
   *   5.0 6.0
   * 
   * is stored as `[1.0, 3.0, 5.0, 2.0, 4.0, 6.0]`.
   *
   * @param numRows number of rows
   * @param numCols number of columns
   * @param values matrix entries in column major
   */
  @Since("1.0.0")
  def this(numRows: Int, numCols: Int, values: Array[Double]) =
    this(numRows, numCols, values, false)

这一段说的就是针对矩阵,使用一个数组来存储,同时指定其行列的个数,然后就可以针对行列的个数来对数组进行划分,进而就可以得到矩阵。

思考:factors的size为什么是 rank * blockSize,以及是否所有的factors的size都是这个?

  1. 每个partition分组、组整合成(用户array,项目矩阵)

此处和Spark2.2.2不同的地方就是针对每个partition进行分组后的操作,此处针对每个分组的数据会把每个组整合成(用户Array,项目矩阵)的二元组数据。其流程图如下所示:

2. recommendForAll函数分析

这里只分析与Spark2.2.2不同的地方:

case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
        val m = srcIds.length
        val n = dstIds.length
        val ratings = srcFactors.transpose.multiply(dstFactors)
        val output = new Array[(Int, (Int, Double))](m * n)
        var k = 0
        ratings.foreachActive  (i, j, r) =>
          output(k) = (srcIds(i), (dstIds(j), r))
          k += 1
        
        output.toSeq
  1. 首先,output是一个 当前组中用户数x当前组中项目数 个 (用户ID,(项目ID,分数))的三元组数组。
  2. ratings同样是一个DenseMatrix,其如下
srcFactor:  k * m 的矩阵
dstFactor: k * n的矩阵
srcFactor'  * dstFaxtor : (m * k) * (k * n) = m * n 的一个矩阵

这里是一个矩阵运算(不清楚的同学可以补充下线性代数的知识)。

  1. 最后两句代码,就是针对ratings中的每个值,把这个值和其对应的用户ID,项目ID的关系拼凑起来,并赋值给output。

从临时数据来看,这个数据是一个(m * n)的一个数据,远远比Spark2.2.2中 m*k (k << n)的数据量小。这也是Spark1.6.3中GC时间过长的原因,可以在后面的分析看到!

3.3 Spark2.2.2和Spark1.6.3 代码对比总结

PointSpark1.6.3Spark2.2.2
计算量要计算每个用户ID的factor和每个项目ID的factor的乘积一样
计算效率使用矩阵相乘,如果不用Native BLAS,那么速度很低使用向量相乘,效率高于 Native BLAS
临时存储数据量临时存储很大(m * n)临时存储较小(m*k)

4. Spark2.2.2和Spark1.6.3性能测试对比

Note:

关于Spark2.2.2的性能测试,如使用Native BLAS等,可参考 Spark ALS应用BLAS加速

测试代码、数据等,同样参考Spark ALS应用BLAS加速

4.1 Spark1.6.3 官网安装包测试

  1. 注意使用Spark官网提供的安装包进行集群安装测试,官网下载地址:spark-1.6.tgz
  2. 同样使用 fansy1990/als_blas 代码,进行编译打包(注意使用对应版本的pom文件)

命令如下:

spark-submit --class demo.AlsTest --deploy-mode cluster /root/als_blas-1.0-for-spark1.6.3.jar 3000
  1. 执行两次后,时间消耗:
    时间消耗如下:

  1. 是否有BLAS的使用?

查看子节点是否,看是否有BLAS的使用:

从图中可以看出,是没有使用BLAS的加速的!

  1. long GC

任务有很长的GC时间,如下(在上面已经有说明,这里只是验证):

4.2 Spark1.6.3 自编译安装包测试

使用自行编译好的Spark 安装包,再次测试一遍

编译命令如下:

  1. 测试时间:
    本次测试分为四次,前两次所有节点都没有安装open BLAS,后面两次是所有节点都安装的情况,如下:

  1. 是否使用BLAS?
    从子节点的运行日志可以看出,确实是有使用BLAS的,如下:

  1. 是否 Long GC?

不管是open BLAS安装前,还是后,都有Long GC,如下:

4.3 Spark2.2.2 vs Spark1.6.3 性能对比总结

部分参数来自Spark ALS应用BLAS加速

版本平均耗时(mins)Long GC
官网Spark21.1
自编译Spark2(BLAS)1.2
官网Spark13.5
自编译SPark1(BLAS)4.3

Spark 2: Spark 2.2.2; Spark1:Spark 1.6.3

从表的分析结果来看:

  1. 如果使用Spark中ALS算法,那么尽量使用Spark2版本;
  2. 从当前的测试实验数据来看,针对Spark1.x版本,就算有Native BLAS,但是ALS算法也并没有加速的迹象,反而更慢了。

5. 思考解答

  1. pfdst RDD的分布情况:

由于pf RDD也有两个分区,每个分区4条记录,所以blockify后,pfdst RDD也有两个分区,并且每个分区只有一条记录,这个记录是一个Seq数组,同时,这个数组中有4个元素。

scala> pfdst.glom().collect.foreach(x => println(x.mkString("|")))
List((101,[D@4cd85d52), (201,[D@5669ee53), (301,[D@7cdbc04), (401,[D@4dd08e5b))
List((501,[D@43ec687e), (601,[D@5a6e1d26), (701,[D@30a9a7f3), (801,[D@794245eb))

scala> pf.glom().collect.foreach(x => println(x.mkString("|")))
(101,[D@3400f5aa)|(201,[D@1bf0208c)|(301,[D@5daf0cb0)|(401,[D@70e1a8ab)
(501,[D@43ffaeb8)|(601,[D@5991020b)|(701,[D@7c7e4f05)|(801,[D@1a714d1)

  1. blockify设置的个数为2,对应多少优先队列?

  2. factors的size为什么是 rank * blockSize,以及是否所有的factors的size都是这个?

以上是关于Spark ALS recommendForAll源码解析实战之Spark1.x vs Spark2.x的主要内容,如果未能解决你的问题,请参考以下文章

Spark:测量 ALS 的性能

ALS推荐算法在Spark上的优化

Spark 的 ALS 中唯一项目的数量有啥限制?

如何在 Spark 中确定 ALS.transImplicit 中的偏好/置信度?

Spark ALS转换性能

spark使用之ALS版本对比