spark实现ALS算法-附scala代码

Posted BJUT赵亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark实现ALS算法-附scala代码相关的知识,希望对你有一定的参考价值。

本文记录了使用scala语言在spark上实现ALS算法的相关内容,如有问题可以邮件(zhaoliang19960421@outlook.com)或微信(BestCoder_BestLife)与我联系

有关协同过滤的相关内容详见 spark实现协同过滤-附scala代码

在itemCF中以user-item-score矩阵为输入,将每个用户对每个商品的行为作为评分,将所有用户的评分作为一个商品的向量,利用该向量计算商品两两之间的相似度,来推荐该商品最相似的前N的其他商品。采用这样的方式依赖的是当两个item有相同的用户有行为时才会有相似度,否则就是0。

使用这样的方式会存在以下问题

  1. 推荐结果会出现马太效应,因为热门item会和所有的item都存在有共同行为的用户,对于所有的item而言都可以和热门计算相似度,热门item都有机会被推荐出来,热门item就会被越推越多
  2. 如果当user和item数量变多、用户显示/隐式行为很少时,user-item-score矩阵会变得很稀疏,计算两两相似度时只能靠很少的几个位置的数据来获得,可信度降低
  3. 当两个item没有共同使用的用户时,无法计算相似度
  4. 存储代价大,itemCF得到的是II/UU相似度,当user-item-score矩阵大小是m*n 时,需要保存的相似度矩阵是m*m、n*n

所以之后发展到了矩阵分解模型,即将m*n的user-item-score原始矩阵,通过分解的得到两个小的m*k、n*k 的小矩阵,通过拟合 m ∗ n = m ∗ k ⋅ n ∗ k m*n = m*k · n*k mn=mknk,这种思想相当于对原始的行为矩阵对每个user、item进行了向量化,每个向量的长度是k.
由于向量化矩阵是从全局的UI矩阵分解得到的,即使两个item在itemCF中无法计算相似度,向量化之后也可以被计算。类似的,由于user、item被表征到了同样长度的向量空间中,那么就可以直接计算ui相似度进行推荐。

1. itemCF是计算II相似度,通过用户有正向行为的商品推荐类似商品,即U2I2I
2. MF是将UI通过向量化到同一个空间中,直接计算User和item之间的相关新,即U2I

矩阵分解的具体方法在spark中采用的是ALS通过梯度下降的方式来训练降低两个小矩阵的乘积与原始矩阵的误差。

使用scala语言在spark中实现的ALS模型具体代码,实现了原始的ALS的u2i召回,i2u推荐,将item向量、user向量保存下来实现i2i召回,在线ANN的u2i召回

具体实现见代码

package *
import java.nio.charset.Charset

import com.google.common.hash.Hashing.murmur3_32
import *.Utils._
import *.SparkContextUtils._
import org.apache.spark.ml.recommendation._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._

/**
 *
 *
 * 本代码已经做了脱敏处理,与原公司及业务无关,特此说明
 *
 *
 * @author zhaoliang19960421@outlook.com on 20220221
 *         基于org.apache.spark.ml.recommendation实现ALS算法
 *         从原始日志中计算得到user-item-score矩阵,通过ALS实现矩阵分解,得到als模型
 *         利用分解后的两个子矩阵,得到user向量、item向量
 *         利用als模型,实现u2i召回,i2u推荐
 *         利用item向量,实现i2i召回
 *         利用user向量,实现u2u计算相似度
 *         利用user向量、item向量,线上进行ANN实现U2I召回
 */
object ALS 
    val userColumn = "gaid" // user列名
    val userHasColumn = "user_hash" // user被hash之后的列名
    val itemColumn = "package_name" // item列名
    val itemHashColumn = "item_hash" // item被hash后的列名
    val userVecColumn = "userVec" // user向量
    val scoreColumn = "score" // user-item 相关度列名
    val itemVecColumn = "itemVec" // item向量
    val recommendNum = 100 // 相似个数
    val filterItemMinNum = 2 // 在计算得到user-item-score矩阵时,对脏数据进行过滤,需要每个用户最少对几个item有过行为
    val filterItemMaxNum = 10 // 同上,每个用户最多对几个item有过行为

    def main(args: Array[String]): Unit = 
        val sparkSession: SparkSession = createSparkSession(this.getClass.getSimpleName)
        val Array(startDate: String, endDate: String) = args
        val userItemScoreDf = getUserItemScoreDf(sparkSession, startDate, endDate).cache()
        val alsModel = getAlsModel(userItemScoreDf)
        val (userFactorDf, itemFactorDf) = getUserItemFactor(userItemScoreDf, alsModel)
        userFactorDf.write.mode("overwrite").save(s"hdfs://**/userFactor/date=$endDate")
        itemFactorDf.write.mode("overwrite").save(s"hdfs://**/itemFactor/date=$endDate")
        //        val i2iDf = getI2IRecall(sparkSession, itemFactorDf)
        /**
         * 采用模型自带的u2i和i2u函数时间的时间复杂度是 userNum * itemNum
         * 如果用户个数或者item个数是百万级级别,离线计算时间复杂很高无法实际使用
         * 不建议采用这种方案,替换方案是在线ANN(如Faiss)进行u2i召回
         * val u2i = getU2IRecall(userItemScoreDf, alsModel)
         * val i2u = getI2URecommend(userItemScoreDf, alsModel)
         */

    

    /**
     * 从日志数据中得到user-item-score矩阵
     */
    def getUserItemScoreDf(sparkSession: SparkSession, startDate: String, endDate: String, isFilter: Boolean = true): DataFrame = 
        /**
         * 由于als模型中user、item列是数字,所以需要对字符串做hash
         * 将字符串在给定的key下做hash
         */
        def hashStringUDF(hashNum: Int): UserDefinedFunction = 
            udf((col: String) =>
                (Math.abs(murmur3_32().hashString(col, Charset.defaultCharset()).asInt()) % hashNum).toDouble
            )
        

        /**
         * 对得到的user-item-score矩阵做过滤
         * 每个用户有行为的item个数不能小于和大于给定值
         */
        def filter(orgDf: DataFrame): DataFrame = 
            val userList: DataFrame = orgDf
                .groupBy(col(userColumn)).agg(countDistinct(col(itemColumn)).as("count"))
                .where(s"count > $filterItemMinNum and count < $filterItemMaxNum")
                .select(col(userColumn))
            orgDf.join(userList, Seq(userColumn))
        

        /**
         * 基于不同的行为,对u-i相似度赋予不同的打分
         */
        val scoreMap = Map("item_click" -> 1, "download_install" -> 2, "activate" -> 3)

        def action2ScoreUDF: UserDefinedFunction = udf((action: String) => scoreMap.getOrElse(action, 0))

        var resultDf = sparkSession.read.parquet("hdfs://***/part.parquet")
            /**
            * 具体业务逻辑生成u-i-s矩阵,
            * 具体而言就是共有3列,共有userNum*itemNum行
            */
            .select(userColumn, itemColumn, scoreColumn)
        if (isFilter) resultDf = filter(resultDf)
        val userNum = resultDf.select(userColumn).distinct().count().toInt
        val itemNum = resultDf.select(itemColumn).distinct().count().toInt
        resultDf = resultDf
            .withColumn(userHasColumn, hashStringUDF(userNum + 1)(col(userColumn)))
            .withColumn(itemHashColumn, hashStringUDF(itemNum + 1)(col(itemColumn)))
        resultDf
    

    /**
     * 在u-i-score矩阵下基于als模型进行矩阵分解得到最终的模型
     *
     * @param userItemScoreDf user-item-score矩阵
     * @param rank            每个元素被分解之后的向量长度,默认是10
     */
    def getAlsModel(userItemScoreDf: DataFrame, rank: Int = 10): ALSModel = 
        val model: ALSModel = new ALS()
            .setUserCol(userHasColumn)
            .setItemCol(itemHashColumn)
            .setRatingCol(scoreColumn)
            .setRank(rank)
            .fit(userItemScoreDf)
        model
    

    /**
     * 基于als模型得到user、item向量
     */
    def getUserItemFactor(userItemScoreDf: DataFrame, model: ALSModel): (DataFrame, DataFrame) = 
        val userFactorDf = model.userFactors
            .withColumnRenamed("id", userHasColumn)
            .join(userItemScoreDf, Seq(userHasColumn))
            .selectExpr(userColumn, s"features as $userVecColumn")
            .distinct()
        val itemFactorDf = model.itemFactors
            .withColumnRenamed("id", itemHashColumn)
            .join(userItemScoreDf, Seq(itemHashColumn))
            .selectExpr(itemColumn, s"features as $itemVecColumn")
            .distinct()
        (userFactorDf, itemFactorDf)
    

    /**
     * 实现U2I召回,对于每个用户找到最相似的前recommendNum个item进行召回
     */
    def getU2IRecall(userItemScoreDf: DataFrame, model: ALSModel): DataFrame = 
        model.recommendForAllUsers(recommendNum)
            .selectExpr(userHasColumn, "explode(recommendations) as temp")
            .withColumn(itemHashColumn, col("temp")(itemHashColumn))
            .withColumn("u2iSim", col("temp")("rating"))
            .join(userItemScoreDf.select(userHasColumn, userColumn), Seq(userHasColumn))
            .join(userItemScoreDf.select(itemHashColumn, itemColumn), Seq(itemHashColumn))
            .selectExpr(userColumn, itemColumn, "u2iSim")
    

    /**
     * 实现I2U推荐,即对于每个商品应该被推荐给recommendNum个用户
     */
    def getI2URecommend(userItemScoreDf: DataFrame, model: ALSModel): DataFrame = 
        model.recommendForAllUsers(recommendNum)
            .selectExpr(itemHashColumn, "explode(recommendations) as temp")
            .withColumn(itemHashColumn, col("temp")(userHasColumn))
            .withColumn("i2uSim", col("temp")("rating"))
            .join(userItemScoreDf.select(userHasColumn, userColumn), Seq(userHasColumn))
            .join(userItemScoreDf.select(itemHashColumn, itemColumn), Seq(itemHashColumn))
            .selectExpr(userColumn, itemColumn, "i2uSim")

    

    /**
     * 基于用户向量计算item两两之间的相似度,实现i2i召回,u2u类似且实际意义较低不在实现
     */
    def getI2IRecall(sparkSession: SparkSession, itemFactorDf: DataFrame): DataFrame = 
        import sparkSession.implicits._
        def cosDistanceUDF: UserDefinedFunction = 
            udf((vec1: collection.mutable.WrappedArray[Float], vec2: collection.mutable.WrappedArray[Float]) => 
                var fenzi = 0.0
                var vec1Length = 0.0
                var vec2Length = 0.0
                for (i <- vec1.indices) 
                    fenzi += vec1(i) * vec2(i)
                    vec1Length += Math.pow(vec1(i), 2)
                    vec2Length += Math.pow(vec2(i), 2)
                
                fenzi / (Math.sqrt(vec1Length) * Math.sqrt(vec2Length))
            )
        

        val itemDf1 = itemFactorDf.selectExpr(s"$itemColumn as id1", s"$itemVecColumn as vec1")
        val itemDf2 = itemFactorDf.selectExpr(s"$itemColumn as id2", s"$itemVecColumn as vec2")
        val i2iRecallDf = itemDf1.crossJoin(itemDf2)
            .withColumn("sim", cosDistanceUDF(col("vec1"), col("vec2")))
            .select("id1", "id2", "sim")
            .rdd
            .map(r => (r.getString(0), (r.getString(1), r.getDouble(2))))
            .groupByKey()
            .map(row => 
                val id = row._1
                val map = row._2.toArray.sortBy(_._2).reverse.toMap
                (id, map)
            )
            .toDF("id", "map")
        i2iRecallDf
    

以上是关于spark实现ALS算法-附scala代码的主要内容,如果未能解决你的问题,请参考以下文章

spark实现ALS矩阵分解-附scala代码

spark实现ALS矩阵分解-附scala代码

spark实现swing算法 -附Scala代码

spark实现swing算法 -附Scala代码

spark实现swing算法 -附Scala代码

spark实现item2Vec算法-附scala代码