spark实现swing算法 -附Scala代码

Posted BJUT赵亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark实现swing算法 -附Scala代码相关的知识,希望对你有一定的参考价值。

本文记录了在工作中使用swing算法实现i2i的相关代码内容,如果做相关工作可以邮件和我联系 liangz1996@hotmail.com
itemcf相关内容参考之前spark实现itemcf-附scala代码

package *

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._

import scala.math.pow, sqrt
import scala.util.Random


object CollaborativeFilteringUtils extends Serializable 

    /**
     * 基于item之间的相似度,得到最相似的前N个结果
     *
     * @param orgDf       item1 item2 similarity
     * @param sign        排序方式,默认是1,即采用cos距离按照相似度越大越相似,如果采用欧式距离等越小越相似则该值为-1
     * @param topN        每个item取最相似的前topN个
     * @param minSimScore 最小的相似度分数,默认不做过滤
     * @param whiteSet    i2i右边的i可推荐的内容,如果为null,则全部可推荐
     * @param blackSet    i2i右边的i不可以推荐的内容,如果为null,则全部可推荐
     * @return 当前的id,和最相似的前topN个结果与对应的相似度
     */
    def getI2IMap(orgDf: DataFrame,
                  sign: Int = 1,
                  topN: Int = 100,
                  minSimScore: Double = -10000D,
                  whiteSet: Broadcast[Set[String]] = null,
                  blackSet: Broadcast[Set[String]] = null): RDD[(String, Map[String, Double])] = 
        orgDf
            .rdd
            .flatMap(r => Array(
                (r.getString(0), (r.getString(1), sign * r.getDouble(2))), // AB BA 都需要计算相似度
                (r.getString(1), (r.getString(0), sign * r.getDouble(2)))
            ))
            .filter(row => row._2._2 >= minSimScore)
            .filter(row => 
                var flag = true
                if (whiteSet != null)
                    flag = whiteSet.value.contains(row._2._1)
                flag
            )
            .filter(row => 
                var flag = true
                if (blackSet != null)
                    flag = !blackSet.value.contains(row._2._1)
                flag
            )
            .distinct()
            .groupByKey()
            .map(row => 
                val id = row._1
                val map = row._2.toArray.sortBy(_._2).reverse.slice(0, topN).toMap
                (id, map)
            )
    


    /**
     * 对当前的i2i相似度计算进行评估
     *
     * @param input   计算i2i的原始日志输入,为了方便计算,需要将列名统一成user、item、score
     * @param output  i2i的输出结果,为item1、item2、sim,为防止AB BA只计算一遍的情况,所以在这里需要拆开
     * @param i2iType 当前i2i计算方式
     */
    def metrics(sparkSession: SparkSession,
                input: DataFrame,
                output: DataFrame,
                i2iType: String): Unit = 
        println(s"当前i2i的计算方式是$i2iType")
        import sparkSession.implicits._
        println(s"当前日志中共有$input.selectExpr("user").distinct().count()个user")
        println(s"当前日志中共有$input.selectExpr("item").distinct().count()个item")
        val temp1 = output
            .rdd
            .flatMap(row => Array(
                (row.getString(0), row.getString(1), row.getDouble(2)),
                (row.getString(1), row.getString(0), row.getDouble(2))
            ))
            .toDF("item1", "item2", "sim")
        println(s"有$temp1.selectExpr("item1").distinct().count()个item可以进行i2i召回")
        val temp2 = temp1.groupBy("item1").agg(countDistinct("item2").as("count"), min("sim").as("minSim"), max("sim").as("maxSim"))
        val maxMinItem2: Array[(Long, Long)] = temp2.agg(max("count"), min("count"))
            .collect()
            .map(row => (row.getLong(0), row.getLong(1)))
        println(s"一个item最多有$maxMinItem2(0)._1个可召回结果,最少有$maxMinItem2(0)._2个结果")
        val maxMinSim: Array[(Double, Double)] = temp2.agg(max("maxSim"), min("minSim"))
            .collect()
            .map(row => (row.getDouble(0), row.getDouble(1)))
        println(s"两个item之间最大的相似度是$maxMinSim(0)._1,最小的是$maxMinSim(0)._2\\n")
    


    /**
     * 基于itemCF计算item之间的相似度,u2u类似
     *
     * @param orgDf   user-item-score 矩阵
     * @param version 不同的计算版本,默认为V1即原始的itemCF计算方法
     * @return
     */
    def itemCF(spark: SparkSession,
               orgDf: DataFrame,
               version: String = "V1"): DataFrame = 
        import spark.implicits._
        val multiply2ColUDF: UserDefinedFunction = udf((a: Double, b: Double) => a * b)
        val divide2ColUDF: UserDefinedFunction = udf((a: Double, b: Double, c: Double) => a / (b * c))
        val divide1ColUDF: UserDefinedFunction = udf((a: Double, b: Double, c: Double) => a * b / math.sqrt(c))
        val column = orgDf.columns
        val userItemScore = orgDf
            .selectExpr(s"cast($column(0) as string) as user", s"cast($column(1) as string) as item", s"cast($column(2) as double) as score")
            .cache()

        // item的向量模长
        val itemLength = userItemScore
            .rdd
            .map(row => (row.getString(1), row.getDouble(2)))
            .filter(row => row._2 != 0)
            .groupByKey()
            .mapValues(x => sqrt(x.toArray.map(ii => pow(ii, 2)).sum))
            .toDF("item", "length")

        var resultDf: DataFrame = null
        version match 
            case "V1" => 
                // 计算cos距离的分子
                val itemPairNumerator = userItemScore.selectExpr("user", "item as item1", "score as score1")
                    .join(userItemScore.selectExpr("user", "item as item2", "score as score2"), Seq("user"))
                    // 只要当一个用户V对两个商品都有行为时,在计算cos距离时,在第V个位置上才会计算出非0
                    .where("item1 < item2")
                    .withColumn("temp", multiply2ColUDF(col("score1"), col("score2")))
                    .groupBy("item1", "item2").agg(sum("temp").as("numerator"))
                resultDf = itemPairNumerator
                    .join(itemLength.selectExpr("item as item1", "length as length1"), Seq("item1"))
                    .join(itemLength.selectExpr("item as item2", "length as length2"), Seq("item1"))
                    // 两个item之间的相似度,即用全局用户组成的向量之间的cos距离,cos距离见 https://blog.csdn.net/m0_37192554/article/details/107359291
                    .withColumn("sim", divide2ColUDF(col("numerator"), col("length1"), col("length2")))
                    .select("item1", "item2", "sim")
            
            case "V2" => 
                // 结合swing的方法,在计算两个item之间相似度时,对重度用户做了惩罚
                val userItemSet = userItemScore
                    .rdd
                    .map(row => (row.getAs[String]("user"), row.getAs[String]("item")))
                    .groupByKey()
                    .map(row => (row._1, row._2.toSet.size))
                    .toDF("user", "setSize") // user,userSetSize
                    .where("setSize > 0")
                    .repartition(1000)
                val itemPairNumerator = userItemScore.selectExpr("user", "item as item1", "score as score1")
                    .join(userItemScore.selectExpr("user", "item as item2", "score as score2"), Seq("user"))
                    .join(userItemSet, Seq("user"))
                    // 只要当一个用户V对两个商品都有行为时,在计算cos距离时,在第V个位置上才会计算出非0
                    .where("item1 < item2")
                    .withColumn("temp", divide1ColUDF(col("score1"), col("score2"), col("setSize")))
                    .groupBy(s"item1", s"item2").agg(sum("temp").as("numerator"))
                resultDf = itemPairNumerator
                    .join(itemLength.selectExpr("item as item1", "length as length1"), Seq("item1"))
                    .join(itemLength.selectExpr("item as item2", "length as length2"), Seq("item1"))
                    // 两个item之间的相似度,即用全局用户组成的向量之间的cos距离,cos距离见 https://blog.csdn.net/m0_37192554/article/details/107359291
                    .withColumn("sim", divide2ColUDF(col("numerator"), col("length1"), col("length2")))
                    .select("item1", s"item2", "sim")
            
        
        resultDf
    


    /**
     * 基于swing计算item之间的相似度
     * swing的计算方式可见 https://blog.csdn.net/qq_33534428/article/details/124839278
     *
     * @param orgDf       user-item-score 当前user对这个item的得分
     * @param alpha       平滑系数,防止分母为0
     * @param minCoUser   两个item之间需要最少有多少个用户有过共同行为。为-1时,利用topItemPair的count得到结果
     * @param topItemPair 如果当没有提供minCouUser时,计算item相似度的时候,只要基于共同用户个数的前多少个item对儿
     * @param isFilter    对两个item而言,是否要基于最小的共同用户数量 / 当前共同的用户数量所得到的比例做过滤
     * @param version     不同的相似度计算方法
     * @return item1,item2,sim
     */
    def swing(sparkSession: SparkSession,
              orgDf: DataFrame,
              alpha: Int = 1,
              minCoUser: Long = -1L,
              topItemPair: Int = 100000,
              isFilter: Boolean = false,
              version: String = "V1"): DataFrame = 
        import sparkSession.implicits._
        val joinStringUDF: UserDefinedFunction = udf((a: String, b: String) => a + "_" + b)
        val joinDoubleUDF: UserDefinedFunction = udf((a: Double, b: Double) => a.toString + "_" + b.toString)

        def splitDouble(s: String): Array[Double] = s.split("_").map(_.toDouble)

        val column = orgDf.columns
        val userItemScore = orgDf
            .selectExpr(s"cast($column(0) as string) as user", s"cast($column(1) as string) as item", s"cast($column(2) as double) as score") // user,item,score
            .repartition(1000)
            .cache()
        val itemLength = userItemScore // 每个item的行为分模长
            .rdd
            .map(row => (row.getAs[String]("item"), row.getAs[Double]("score")))
            .groupByKey()
            .mapValues(row => sqrt(row.toArray.map(ii => pow(ii, 2)).sum))
            .toDF(s"item", "length")
        val userItemPair = userItemScore.selectExpr("item as item1", "user", "score as score1")
            .join(userItemScore.selectExpr("item as item2", "user", "score as score2"), Seq("user"))
            .where("item1 < item2") // AB BA 只要一次
            .join(itemLength.selectExpr("item as item1", "length as length1"), Seq("item1"))
            .join(itemLength.selectExpr("item as item2", "length as length2"), Seq("item2"))
            .withColumn("itemPair", joinStringUDF(col("item1"), col("item2")))
            .withColumn("scorePair", joinDoubleUDF(col("score1"), col("score2")))
            .withColumn("lengthPair", joinDoubleUDF(col("length1"), col("length2")))
            .selectExpr("user", "itemPair", "scorePair", "lengthPair") // user对两个item都有行为,其中scorePair是两个行为的分数
            .distinct()

        val itemPairCount = userItemPair
            .groupBy("itemPair").count()
        val minCount = // 每个item对儿之间,最少需要有多少个用户对这两个item都有过行为
            if (minCoUser != -1L)
                minCoUser
            else itemPairCount
                .sort(desc("count"))
                .selectExpr("count")
                .limit(topItemPair)
                .rdd
                .map(_.getLong(Spark推荐系列之Word2vec算法介绍实现和应用说明

Spark推荐系列之Word2vec算法介绍实现和应用说明

Spark推荐系列之Word2vec算法介绍实现和应用说明

Spark推荐系列之Word2vec算法介绍实现和应用说明

大数据算法设计模式 - topN spark实现

Spark MLib:梯度下降算法实现