spark实现swing算法 -附Scala代码
Posted BJUT赵亮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark实现swing算法 -附Scala代码相关的知识,希望对你有一定的参考价值。
本文记录了在工作中使用swing算法实现i2i的相关代码内容,如果做相关工作可以邮件和我联系 liangz1996@hotmail.com
itemcf相关内容参考之前spark实现itemcf-附scala代码
package *
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import scala.math.pow, sqrt
import scala.util.Random
object CollaborativeFilteringUtils extends Serializable
/**
* 基于item之间的相似度,得到最相似的前N个结果
*
* @param orgDf item1 item2 similarity
* @param sign 排序方式,默认是1,即采用cos距离按照相似度越大越相似,如果采用欧式距离等越小越相似则该值为-1
* @param topN 每个item取最相似的前topN个
* @param minSimScore 最小的相似度分数,默认不做过滤
* @param whiteSet i2i右边的i可推荐的内容,如果为null,则全部可推荐
* @param blackSet i2i右边的i不可以推荐的内容,如果为null,则全部可推荐
* @return 当前的id,和最相似的前topN个结果与对应的相似度
*/
def getI2IMap(orgDf: DataFrame,
sign: Int = 1,
topN: Int = 100,
minSimScore: Double = -10000D,
whiteSet: Broadcast[Set[String]] = null,
blackSet: Broadcast[Set[String]] = null): RDD[(String, Map[String, Double])] =
orgDf
.rdd
.flatMap(r => Array(
(r.getString(0), (r.getString(1), sign * r.getDouble(2))), // AB BA 都需要计算相似度
(r.getString(1), (r.getString(0), sign * r.getDouble(2)))
))
.filter(row => row._2._2 >= minSimScore)
.filter(row =>
var flag = true
if (whiteSet != null)
flag = whiteSet.value.contains(row._2._1)
flag
)
.filter(row =>
var flag = true
if (blackSet != null)
flag = !blackSet.value.contains(row._2._1)
flag
)
.distinct()
.groupByKey()
.map(row =>
val id = row._1
val map = row._2.toArray.sortBy(_._2).reverse.slice(0, topN).toMap
(id, map)
)
/**
* 对当前的i2i相似度计算进行评估
*
* @param input 计算i2i的原始日志输入,为了方便计算,需要将列名统一成user、item、score
* @param output i2i的输出结果,为item1、item2、sim,为防止AB BA只计算一遍的情况,所以在这里需要拆开
* @param i2iType 当前i2i计算方式
*/
def metrics(sparkSession: SparkSession,
input: DataFrame,
output: DataFrame,
i2iType: String): Unit =
println(s"当前i2i的计算方式是$i2iType")
import sparkSession.implicits._
println(s"当前日志中共有$input.selectExpr("user").distinct().count()个user")
println(s"当前日志中共有$input.selectExpr("item").distinct().count()个item")
val temp1 = output
.rdd
.flatMap(row => Array(
(row.getString(0), row.getString(1), row.getDouble(2)),
(row.getString(1), row.getString(0), row.getDouble(2))
))
.toDF("item1", "item2", "sim")
println(s"有$temp1.selectExpr("item1").distinct().count()个item可以进行i2i召回")
val temp2 = temp1.groupBy("item1").agg(countDistinct("item2").as("count"), min("sim").as("minSim"), max("sim").as("maxSim"))
val maxMinItem2: Array[(Long, Long)] = temp2.agg(max("count"), min("count"))
.collect()
.map(row => (row.getLong(0), row.getLong(1)))
println(s"一个item最多有$maxMinItem2(0)._1个可召回结果,最少有$maxMinItem2(0)._2个结果")
val maxMinSim: Array[(Double, Double)] = temp2.agg(max("maxSim"), min("minSim"))
.collect()
.map(row => (row.getDouble(0), row.getDouble(1)))
println(s"两个item之间最大的相似度是$maxMinSim(0)._1,最小的是$maxMinSim(0)._2\\n")
/**
* 基于itemCF计算item之间的相似度,u2u类似
*
* @param orgDf user-item-score 矩阵
* @param version 不同的计算版本,默认为V1即原始的itemCF计算方法
* @return
*/
def itemCF(spark: SparkSession,
orgDf: DataFrame,
version: String = "V1"): DataFrame =
import spark.implicits._
val multiply2ColUDF: UserDefinedFunction = udf((a: Double, b: Double) => a * b)
val divide2ColUDF: UserDefinedFunction = udf((a: Double, b: Double, c: Double) => a / (b * c))
val divide1ColUDF: UserDefinedFunction = udf((a: Double, b: Double, c: Double) => a * b / math.sqrt(c))
val column = orgDf.columns
val userItemScore = orgDf
.selectExpr(s"cast($column(0) as string) as user", s"cast($column(1) as string) as item", s"cast($column(2) as double) as score")
.cache()
// item的向量模长
val itemLength = userItemScore
.rdd
.map(row => (row.getString(1), row.getDouble(2)))
.filter(row => row._2 != 0)
.groupByKey()
.mapValues(x => sqrt(x.toArray.map(ii => pow(ii, 2)).sum))
.toDF("item", "length")
var resultDf: DataFrame = null
version match
case "V1" =>
// 计算cos距离的分子
val itemPairNumerator = userItemScore.selectExpr("user", "item as item1", "score as score1")
.join(userItemScore.selectExpr("user", "item as item2", "score as score2"), Seq("user"))
// 只要当一个用户V对两个商品都有行为时,在计算cos距离时,在第V个位置上才会计算出非0
.where("item1 < item2")
.withColumn("temp", multiply2ColUDF(col("score1"), col("score2")))
.groupBy("item1", "item2").agg(sum("temp").as("numerator"))
resultDf = itemPairNumerator
.join(itemLength.selectExpr("item as item1", "length as length1"), Seq("item1"))
.join(itemLength.selectExpr("item as item2", "length as length2"), Seq("item1"))
// 两个item之间的相似度,即用全局用户组成的向量之间的cos距离,cos距离见 https://blog.csdn.net/m0_37192554/article/details/107359291
.withColumn("sim", divide2ColUDF(col("numerator"), col("length1"), col("length2")))
.select("item1", "item2", "sim")
case "V2" =>
// 结合swing的方法,在计算两个item之间相似度时,对重度用户做了惩罚
val userItemSet = userItemScore
.rdd
.map(row => (row.getAs[String]("user"), row.getAs[String]("item")))
.groupByKey()
.map(row => (row._1, row._2.toSet.size))
.toDF("user", "setSize") // user,userSetSize
.where("setSize > 0")
.repartition(1000)
val itemPairNumerator = userItemScore.selectExpr("user", "item as item1", "score as score1")
.join(userItemScore.selectExpr("user", "item as item2", "score as score2"), Seq("user"))
.join(userItemSet, Seq("user"))
// 只要当一个用户V对两个商品都有行为时,在计算cos距离时,在第V个位置上才会计算出非0
.where("item1 < item2")
.withColumn("temp", divide1ColUDF(col("score1"), col("score2"), col("setSize")))
.groupBy(s"item1", s"item2").agg(sum("temp").as("numerator"))
resultDf = itemPairNumerator
.join(itemLength.selectExpr("item as item1", "length as length1"), Seq("item1"))
.join(itemLength.selectExpr("item as item2", "length as length2"), Seq("item1"))
// 两个item之间的相似度,即用全局用户组成的向量之间的cos距离,cos距离见 https://blog.csdn.net/m0_37192554/article/details/107359291
.withColumn("sim", divide2ColUDF(col("numerator"), col("length1"), col("length2")))
.select("item1", s"item2", "sim")
resultDf
/**
* 基于swing计算item之间的相似度
* swing的计算方式可见 https://blog.csdn.net/qq_33534428/article/details/124839278
*
* @param orgDf user-item-score 当前user对这个item的得分
* @param alpha 平滑系数,防止分母为0
* @param minCoUser 两个item之间需要最少有多少个用户有过共同行为。为-1时,利用topItemPair的count得到结果
* @param topItemPair 如果当没有提供minCouUser时,计算item相似度的时候,只要基于共同用户个数的前多少个item对儿
* @param isFilter 对两个item而言,是否要基于最小的共同用户数量 / 当前共同的用户数量所得到的比例做过滤
* @param version 不同的相似度计算方法
* @return item1,item2,sim
*/
def swing(sparkSession: SparkSession,
orgDf: DataFrame,
alpha: Int = 1,
minCoUser: Long = -1L,
topItemPair: Int = 100000,
isFilter: Boolean = false,
version: String = "V1"): DataFrame =
import sparkSession.implicits._
val joinStringUDF: UserDefinedFunction = udf((a: String, b: String) => a + "_" + b)
val joinDoubleUDF: UserDefinedFunction = udf((a: Double, b: Double) => a.toString + "_" + b.toString)
def splitDouble(s: String): Array[Double] = s.split("_").map(_.toDouble)
val column = orgDf.columns
val userItemScore = orgDf
.selectExpr(s"cast($column(0) as string) as user", s"cast($column(1) as string) as item", s"cast($column(2) as double) as score") // user,item,score
.repartition(1000)
.cache()
val itemLength = userItemScore // 每个item的行为分模长
.rdd
.map(row => (row.getAs[String]("item"), row.getAs[Double]("score")))
.groupByKey()
.mapValues(row => sqrt(row.toArray.map(ii => pow(ii, 2)).sum))
.toDF(s"item", "length")
val userItemPair = userItemScore.selectExpr("item as item1", "user", "score as score1")
.join(userItemScore.selectExpr("item as item2", "user", "score as score2"), Seq("user"))
.where("item1 < item2") // AB BA 只要一次
.join(itemLength.selectExpr("item as item1", "length as length1"), Seq("item1"))
.join(itemLength.selectExpr("item as item2", "length as length2"), Seq("item2"))
.withColumn("itemPair", joinStringUDF(col("item1"), col("item2")))
.withColumn("scorePair", joinDoubleUDF(col("score1"), col("score2")))
.withColumn("lengthPair", joinDoubleUDF(col("length1"), col("length2")))
.selectExpr("user", "itemPair", "scorePair", "lengthPair") // user对两个item都有行为,其中scorePair是两个行为的分数
.distinct()
val itemPairCount = userItemPair
.groupBy("itemPair").count()
val minCount = // 每个item对儿之间,最少需要有多少个用户对这两个item都有过行为
if (minCoUser != -1L)
minCoUser
else itemPairCount
.sort(desc("count"))
.selectExpr("count")
.limit(topItemPair)
.rdd
.map(_.getLong(Spark推荐系列之Word2vec算法介绍实现和应用说明