spark协同过滤算法-附scala代码

Posted BJUT赵亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark协同过滤算法-附scala代码相关的知识,希望对你有一定的参考价值。

本文记录了在spark上协同过滤算法的相关内容,如果有做相关工作的同学,可以邮件与我联系 zhaoliang19960421@outlook.com

本文参考了spark协同过滤,在此表示感谢

协同过滤算法的本质是在全局范围内统计用户的行为,对每个行为进行打分记录,找到行为最相似的两人或者所有人的行为最相似的两个物品。具体的协同过滤的过程如下图所示
在这里插入图片描述
其中cos距离的计算方式如下图所示
在这里插入图片描述

具体的操作方式如下( 以用户的协同过滤为例):

  1. 在全局范围对,每个用户对每个商品进行打分(对于不同的行为可以给与不同的分值,代表不同的权重)
  2. 每一个用户用一个向量来表示,向量长度是商品个数,然后对于用户而言两两计算cos距离
    1. 分母是每个向量的模长, 每个用户的向量模长,依次计算即可
    2. 在计算cos的分子时,当且仅当两个向量在对应位置上都有值时才有结果
    3. 那么仅计算每个商品都有打分的商品即可,即对于每个用户而言,以商品为主键进行join,得到两两用户之间都有行为的商品,依次相乘求和后,即可得到cos的分子(做上三角取值,因为用户两两join会出现,AB、BA 两行数据)
    4. 在获得两两用户之间的cos分子,分别join每个用户的向量模型,做除法得到两个用户之间的cos距离
  3. 得到的两个用户之间的距离的dataframe保存在hdfs中后续使用
package analysis.theme

import breeze.numerics.{pow, sqrt}
import conf.CreateSparkSession
import conf.DateUtil.getFrontDay
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._

/**
 * @author zhaoliang6@xiaomi.com on 20210506
 *         基于用户行为的协同过滤算法
 */
object UserItemCF {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = CreateSparkSession.createSparkSession(this.getClass.getSimpleName)
        var startDate = getFrontDay(2);if (args.length > 0) startDate = args(0)
        var endDate = getFrontDay(1);if (args.length > 1) endDate = args(1)
        val userItemScoreDf = getUserItemScoreDf(spark, startDate, endDate)
        val itemCFDf = calcuate(spark, userItemScoreDf, Array("imei", "product_id", "score"), "item")
        itemCFDf.write.mode(SaveMode.Overwrite).save("hdfs://****/itemCF")
		val simItemDf = exp_itemCF(spark)
    }

    /**
     * 获得用户-商品-得分 矩阵,其中不同的动作获得得分不同
     *
     * @param sparkSession
     * @param startDate 统计开始时间
     * @param endDate   统计结束时间
     * @return uid-itemid-score dataframe
     */
    def getUserItemScoreDf(sparkSession: SparkSession, startDate: String, endDate: String): DataFrame = {
        val scoreMap = Map(
            "click" -> 1, "download" -> 2, "apply" -> 3,
            "like" -> 4, "comment" -> 4, "share" -> 4, "community" -> 4, "favorate" -> 4, "buy_success" -> 5,
            "dis_like" -> -4, "dis_favorate" -> -4
        )
        // uid,itemid,action,...
        val orgDf = sparkSession.read.parquet(s"")
            .where(s"date between $startDate and $endDate")
            .where("imei is not null and product_id is not null")
            .cache()

        var resultDf: DataFrame = null
        scoreMap.keys.toArray.foreach(action => {
            val temp = orgDf.where(s"action = '$action'")
                .withColumn("score", lit(scoreMap(s"$action")))
                .selectExpr("imei", "product_id", "score")
            if (resultDf == null) {
                resultDf = temp
            } else {
                resultDf = resultDf.union(temp)
            }
        })
        resultDf.groupBy("imei", "product_id").agg(sum("score").as("score")).cache()
    }

    /*
     * 基于物品和基于用户的协同过滤计算逻辑相同,以基于物品的为例解释
     * 由原始的 uid-itemid-score 矩阵获得基于物品的协同过滤矩阵dataframe
     * 计算两个商品之间的相似度,需要分别获得两个商品A、商品B 对于所有用户的得分向量,然后对着两个向量计算cos距离
     *                  A·B                求和 A_i * B_i
     * cos_distance = ——————— =  ————————————————————————————————————      其中A_i 是A向量中的第i个元素
     *                 |A|*|B|     开平方(求和A_i)  * 开平方(求和B_i)
     * 有关cos距离详见 https://blog.csdn.net/m0_37192554/article/details/107359291
     * 对于商品A而言,所有用户的得分向量是item1ScoreList = [ 1,0,1,1,0....]
     * 对于商品B而言,所有用户的得分向量是item2ScoreList = [ 0,0,1,0,0....]
     * 从数据中可以看到,只有当一个商品同时有两个商品的行为得分时,计算cos距离时才有意义,
     * 否则在分子的计算中,对应位置在分子和分母的计算结果中都是0
     * 所以在计算两个商品的相似度时,仅计算对两个商品都有行为的用户即可
     * 由原始的uid-itemid-score 矩阵通过join uid的方式获得两个用户之间都有过行为的商品列表
     * 然后在两个用户都有行为的商品中计算cos距离的分子,
     * 每个用户分别计算各自的模长做分母,最后得到cons距离
     * 基于用户的协同过滤计算方式是一样的
     *
     * @param orgDf  原始的 uid-itemid-score 矩阵
     * @param column 原始的df中的列名,依次对应userid ,itemid,score
     * @param dfType user或item,代表基于用户或者基于商品的协同过滤
     * @return uid1 uid2 similarity 的dataframe
     */
    def calcuate(spark: SparkSession, orgDf: DataFrame, column: Array[String], dfType: String): DataFrame = {
        assert(Array("user", "item").contains(dfType))
        val type2type = Map("user" -> "item", "item" -> "user")
        import spark.implicits._
        val userItemDf = orgDf.selectExpr(s"${column(0)} as user_id", s"${column(1)} as item_id", s"${column(2)} as rating")
        val userItemDf2 = orgDf.selectExpr(s"${column(0)} as user_id2", s"${column(1)} as item_id2", s"${column(2)} as rating2")

        // 计算cos距离的分子
        val elementMulitplyDf = userItemDf
            .join(userItemDf2, userItemDf(s"${type2type(dfType)}_id") === userItemDf2(s"${type2type(dfType)}_id2"))
            .filter(s"${dfType}_id < ${dfType}_id2") // 上三角取值
            .withColumn("rating_product", multiply2ColumnUDF(col("rating"), col("rating2")))
            .groupBy(s"${dfType}_id", s"${dfType}_id2").agg(sum("rating_product"))
            .withColumnRenamed("sum(rating_product)", "rating_sum_pro")
            .cache()
        elementMulitplyDf.show()

        // 商品或用户向量的模长
        val itemVectorLengthDf = userItemDf
            .rdd
            .map(x => {
                dfType match {
                    case "user" => (x(0).toString, x(2).toString)
                    case "item" => (x(1).toString, x(2).toString)
                }
            })
            .groupByKey()
            .mapValues(x => sqrt(x.toArray.map(line => pow(line.toDouble, 2)).sum))
            .toDF(s"${dfType}_id_sum", "rating_sqrt_sum")
        itemVectorLengthDf.show()

        // 对于计算 uid1 uid2 相似度得到的分子
        // 依次join得到uid1的向量模长,join得到uid2的向量模长
        // 计算最终的cos距离
        val resultDf = elementMulitplyDf
            .join(itemVectorLengthDf, elementMulitplyDf(s"${dfType}_id") === itemVectorLengthDf(s"${dfType}_id_sum"))
            .drop(s"${dfType}_id_sum")
            .withColumnRenamed("rating_sqrt_sum", s"rating_sqrt_${dfType}_id_sum")

            .join(itemVectorLengthDf, elementMulitplyDf(s"${dfType}_id2") === itemVectorLengthDf(s"${dfType}_id_sum"))
            .drop(s"${dfType}_id_sum")
            .withColumnRenamed("rating_sqrt_sum", s"rating_sqrt_${dfType}_id2_sum")

            .withColumn("sim", divide2ColumnUDF(col("rating_sum_pro"), col(s"rating_sqrt_${dfType}_id_sum"), col(s"rating_sqrt_${dfType}_id2_sum")))
            .select(s"${dfType}_id", s"${dfType}_id2", "sim")
            .na.fill(0.0)
        resultDf.show()
        resultDf
    }

    // udf 对输入的两列进行相乘
    val multiply2ColumnUDF: UserDefinedFunction = udf((s1: Double, s2: Double) => s1 * s2)
    // udf 对输入的三列 a,b,c 进行 a/(b*c)
    val divide2ColumnUDF: UserDefinedFunction = udf((pro: Double, s1: Double, s2: Double) => pro / (s1 * s2))
    
    /**
     * 基于物品的协同过滤
     */
    def exp_itemCF(sparkSession: SparkSession): DataFrame = {
        val orgDf = sparkSession.read.parquet("hdfs://****/itemCF")
            .na.fill(0.0)
        import sparkSession.implicits._
        val selectNum = 50
        val result = orgDf
            .rdd
            .map(r => (r.getString(0), (r.getString(1), r.getDouble(2))))
            .groupByKey()
            .map(row => {
                val id = row._1
                val struct = row._2.toArray.sortBy(-_._2).slice(0, selectNum)
                val json = new JsonObject()
                struct.foreach(s => json.addProperty(s._1, s._2))
                (id, json.toString)
            })
            .toDF("id", "similaryItemByCF")
        result
    }

}


以上是关于spark协同过滤算法-附scala代码的主要内容,如果未能解决你的问题,请参考以下文章

spark实现ALS算法-附scala代码

spark实现ALS矩阵分解-附scala代码

spark实现ALS矩阵分解-附scala代码

spark实现swing算法 -附Scala代码

spark实现swing算法 -附Scala代码

spark实现swing算法 -附Scala代码