推荐系统-01-电影推荐与结果评估

Posted 心飞翔<

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了推荐系统-01-电影推荐与结果评估相关的知识,希望对你有一定的参考价值。

import spark.sql
import org.apache.spark.sql.types._
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

// 数据预处理
case class Movie(movieId:Int, title:String, genres:Seq[String])
case class User(userId:Int, gender:String, age:Int, occupation:Int, zip:String)

def parseMovie(str:String):Movie={
    val fields = str.split("::")
    assert(fields.size == 3)
    Movie(fields(0).toInt, fields(1).toString, Seq(fields(2)))
}

def parseUser(str:String):User={
    val fields = str.split("::")
    assert(fields.size == 5)
    User(fields(0).toInt, fields(1).toString, fields(2).toInt, fields(3).toInt, fields(4).toString) 
}

def parseRating(str:String):Rating={
    val fields = str.split("::")
    assert(fields.size == 4)
    Rating(fields(0).toInt, fields(1).toInt, fields(2).toInt)   
}
// 加载评分文件到RDD, 这个也可以是HADOOP源
val ratingText = sc.textFile("file:/home/hadoop/ml-1m/ratings.dat");
ratingText.first()

// 对原始RDD数据, 进行转换处理,并缓存
val ratingRDD = ratingText.map(parseRating).cache()
// 下面是打印查看一下相关信息
println("Total number of ratings : " + ratingRDD.count())
println("Total number of movies rated : " + ratingRDD.map(_.product).distinct().count())
println("Total number of users who rated moives:" + ratingRDD.map(_.user).distinct().count())

// 将RDD转换成为DataFrame
val ratingDF = ratingRDD.toDF();
// 同理,加载电影信息
val movieDF=sc.textFile("file:/home/hadoop/ml-1m/movies.dat").map(parseMovie).toDF();
// 同理,加载用户信息
val userDF=sc.textFile("file:/home/hadoop/ml-1m/users.dat").map(parseUser).toDF();
ratingDF.printSchema()
movieDF.printSchema()
userDF.printSchema()

// 将DataFrame数据注册临时表, 就可以临时表进行SQL操作
ratingDF.registerTempTable("ratings")
movieDF.registerTempTable("movies")
userDF.registerTempTable("users")

// SQL操作DataFrame数据后,返回DataFrame数据
val result = sql("""select title, rmax, rmin, ucnt from 
(select product, max(rating) as rmax, min(rating) as rmin, count(distinct user) as ucnt from ratings group by product) ratingsCNT
join movies on product=movieId
order by ucnt desc""")
result.show()

// SQL操作DataFrame数据后,返回DataFrame数据
val mostActiveUser=sql("""select user, count(*) as cnt 
from ratings group by user order by cnt desc limit 10 """)
mostActiveUser.show()
// SQL操作DataFrame数据后,返回DataFrame数据
var result = sql("""select title from ratings join movies on movieId=product
where user=4169 and rating>4""")
result.show()

// ALS(交替最小二乘法)算法处理
// 将评分RDD数据化分成训练集与测试集
val split=ratingRDD.randomSplit(Array(0.8,0.2), 0L)
val trainingSet=split(0).cache()
val testSet=split(1).cache()
trainingSet.count()
testSet.count()

// 这里的RANK是UV间的feature秩, 训练得出模型
val model = (new ALS().setRank(20).setIterations(10).run(trainingSet))

// Array[Rating], 这里注意DF,没有直接的map操作
// 利用模型进行电影推荐
val recomForTopUser=model.recommendProducts(4169,5)
val movieTitle = movieDF.rdd.map(array=>(array(0),array(1))).collectAsMap();
val recomResult=recomForTopUser.map(rating=>(movieTitle(rating.product), rating.rating)).foreach(println)

// 这里MAP运算, 类匹配
val testUserProduct=testSet.map{
    case Rating(user,product,rating) => (user,product)
}
// 对测试集进行预测
val testUserProductPredict=model.predict(testUserProduct)
testUserProductPredict.take(10).mkString("\n")
    
val testSetPair=testSet.map{
    case Rating(user,product,rating) => ((user,product), rating)
}

val predictionPair=testUserProductPredict.map{
    case Rating(user,product,rating) => ((user,product), rating)
}
// 将测试集的预测评分与测试集给定的评分相减, 统计得出平均错误mae
val joinTestPredict=testSetPair.join(predictionPair)
val mae=joinTestPredict.map{
    case ((user,product),(ratingT,ratingP)) =>
    val err=ratingT-ratingP
    Math.abs(err)
}.mean()

//FP, 过滤一下低分和高分
val fp = joinTestPredict.filter{
    case ((user,product),(ratingT,ratingP)) =>
    (ratingT <= 1 & ratingP >=4)
}
fp.count()

import org.apache.spark.mllib.evaluation._
val ratingTP=joinTestPredict.map{
    case ((user,product),(ratingT,ratingP))=>
    (ratingP,ratingT)
}
// 现测试一下平均绝对误差
val evaluator = new RegressionMetrics(ratingTP)
evaluator.meanAbsoluteError

以上是关于推荐系统-01-电影推荐与结果评估的主要内容,如果未能解决你的问题,请参考以下文章

如何评估推荐系统的健康状况?

如何评估基于内容的推荐系统

推荐系统-评估

推荐系统-评估

推荐系统(recommender systems):预测电影评分--构造推荐系统的一种方法:基于内容的推荐

大数据技术之_24_电影推荐系统项目_04_推荐系统算法详解

(c)2006-2024 SYSTEM All Rights Reserved IT常识