spark实现ALS矩阵分解-附scala代码

Posted BJUT赵亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark实现ALS矩阵分解-附scala代码相关的知识,希望对你有一定的参考价值。

本文记录了使用scala语言在spark上实现ALS模型的相关内容,如有问题可以邮件(zhaoliang19960421@outlook.com)或微信(BestCoder_BestLife)与我联系

有关协同过滤的相关内容详见 spark协同过滤算法-附scala代码

在itemCF中以user-item-score矩阵为输入,通过计算在原始矩阵中的item向量(每个用户对该向量的行为为评分,全部用户的评分向量)之间的相似度来得到推荐相似的item。采用这样的方式依赖的是当两个item有相同的用户有行为时才会有相似度,否则就是0。

使用这样的方式会存在以下问题

  1. 推荐结果会出现马太效应,因为热门item会和所有的item都存在有共同行为的用户,那么对于热门而言会越推越多
  2. 如果当user和item数量变多、用户显示/隐式行为很少时,user-item-score矩阵会变得很稀疏,计算两两相似度时只能靠很少的几个位置的数据来获得,可信度降低
  3. 当两个item没有共同使用的用户时,无法计算相似度
  4. 存储代价大,itemCF得到的是II/UU 相似度,当user-item-score矩阵大小是m*n 时,需要保存的相似度矩阵是m*m、n*n

所以之后发展到了矩阵分解模型,即将m*n的user-item-score原始矩阵,通过分解的得到两个小的m*k、n*k 的小矩阵,通过拟合 m ∗ n = m ∗ k ⋅ n ∗ k m*n = m*k · n*k mn=mknk,这种思想相当于对原始的行为矩阵对每个user、item进行了向量化,每个向量的长度是k.
由于向量化矩阵是从全局的UI矩阵分解得到的,即使两个item在itemCF中无法计算相似度,向量化之后也可以被计算。类似的,由于user、item被表征到了同样长度的向量空间中,那么就可以直接计算ui相似度进行推荐。

itemCF是计算II 相似度,通过用户有正向行为的商品推荐类似商品,即U2I2I
MF是将UI通过向量化到同一个空间中,直接计算User和item之间的相关新,即U2I

矩阵分解的具体分解放在在spark中采用的是ALS通过梯度下降的方式来训练降低两个小矩阵的乘积与原始矩阵的误差。
使用scala语言在spark中实现的ALS模型具体代码如下

package com.xiaomi.sdalg.theme

import java.nio.charset.Charset
import com.google.common.hash.Hashing.murmur3_32
import com.xiaomi.sdalg.commons.SparkContextUtils.createSparkSession
import com.xiaomi.sdalg.theme.ItemCF.getUserItemScoreDf  // 见前文itemCF代码
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import scala.collection.mutable.ArrayBuffer

object ALS {
    def main(args: Array[String]): Unit = {
        val sparkSession: SparkSession = createSparkSession(this.getClass.getSimpleName)
        val Array(startDate: String, endDate: String, savePath: String) = args
        val userItemScoreDf = getUserItemScoreDf(sparkSession, startDate, endDate)
        val resultDf = calculate(sparkSession, userItemScoreDf, "imei", "product_id", "score")
        resultDf.show(false)
        resultDf.printSchema()
//        resultDf
//            .withColumnRenamed("imei", "id")
//            .write.mode("overwrite").save(savePath)

    }

    /**
     * 对原始的user-item-score 矩阵通过ALS算法实现矩阵分解
     * 找到每个user最相似的recommendNum个item
     *
     * @param orgDf         原始的user-item-score矩阵,共有三列
     * @param userFeature   orgDf中的user列的列名,在ALS中user需要用int表示,需要对原始的内容做hash得到int
     * @param itemFeature   orgDf中的item列的列名,作用同上
     * @param ratingFeature orgDf中的user-item 相关性得分列的列名
     * @param recommendNum  每个用户推荐的item数量
     * @param rank          分解后小矩阵维度,即user/item隐藏向量维度
     */
    def calculate(sparkSession: SparkSession,
                  orgDf: DataFrame,
                  userFeature: String,
                  itemFeature: String,
                  ratingFeature: String,
                  recommendNum: Int = 100,
                  rank: Int = 100): DataFrame = {
        import sparkSession.implicits._
        val userNum = orgDf.select(userFeature).distinct().count().toInt
        val itemNum = orgDf.select(itemFeature).distinct().count().toInt
        val userItemScoreDf = orgDf
            .withColumn("user_hash", hashStringUdf(userNum + 1)(col(userFeature)))
            .withColumn("item_hash", hashStringUdf(itemNum + 1)(col(itemFeature)))
        val model: ALSModel = new ALS()
            .setUserCol("user_hash")
            .setItemCol("item_hash")
            .setRatingCol(ratingFeature)
            .setRank(rank)
            .fit(userItemScoreDf)
        val userRecommendDf = model.recommendForAllUsers(recommendNum)
            .selectExpr("user_hash", "explode(recommendations) as temp")
            .withColumn("item_hash", col("temp")("item_hash"))
            .withColumn("rating", col("temp")("rating"))
            .join(userItemScoreDf.select("user_hash", userFeature), Seq("user_hash")).drop("user_hash")
            .join(userItemScoreDf.select("item_hash", itemFeature), Seq("item_hash")).drop("item_hash")
            .groupBy(userFeature).agg(collect_list(struct(itemFeature, "rating")).as("struct"))
            .rdd
            .map(r => {
                val user = r.getAs[String](userFeature)
                var map: Map[String, Float] = Map[String, Float]()
                val struct = r.getAs[collection.mutable.WrappedArray[GenericRowWithSchema]]("struct")
                val array = new ArrayBuffer[(String, Float)]()
                struct.foreach(f => array.append((f.getString(0), f.getFloat(1))))
                array.sortBy(_._2).reverse.foreach(f => {
                    map ++= Map(f._1 -> f._2)
                })
                (user, map)
            })
            .toDF(userFeature, "similarItemByALS")
        userRecommendDf
    }

    /**
     * 将字符串在给定的key下做hash
     *
     * @param hashNum hash的key
     * @return
     */
    def hashStringUdf(hashNum: Int): UserDefinedFunction = {
        udf((col: String) => {
            (Math.abs(murmur3_32().hashString(col, Charset.defaultCharset()).asInt()) % hashNum).toDouble
        })
    }

}

以上是关于spark实现ALS矩阵分解-附scala代码的主要内容,如果未能解决你的问题,请参考以下文章

spark实现ALS算法-附scala代码

spark实现ALS算法-附scala代码

如何在我的 Spark 管道中集成 ALS 以实现非负矩阵分解?

推荐算法协同过滤算法代码(pyspark | ALS)

深入理解Spark ML:基于ALS矩阵分解的协同过滤算法与源码分析

推荐系统中矩阵分解算法-funkSVD和ALS