从MongoDB中 读取数据,并按照需求进行写入数据
Posted youngxuebo
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从MongoDB中 读取数据,并按照需求进行写入数据相关的知识,希望对你有一定的参考价值。
需求,从MongoDB中读取数据,完成下列需求
(1)、统计评分最多的电影
(2)、每部电影平均分
(3)、按评分类别统计平均分最高的 Top10
封装样本类:Model.scala
package Statistics
/**
* @author youngx
* @date 2021/4/19 14:34
*/
/**
* Movie【 电 影 数 据 表 】
* @param mid 电影的 ID
* @param name 电影的名称
* @param descri 电影的描述
* @param timelong 电影的时长
* @param issue 电影发布时间
* @param shoot 电影拍摄时间
* @param language 电影语言
* @param genres 电影所属类别
* @param actors 电影的演员
* @param director 电影的导演
*/
case class Movies(val mid:Int,
val name:String,
val descri:String,
val timelong:String,
val issue:String,
val shoot:String,
val language:String,
val genres:String,
val actors:String,
val director:String
)
/**
* Rating【 用 户 评 分 表 】
* @param uid 用户的 ID
* @param mid 电影的 ID
* @param score 电影的分值
* @param timestamp 评分的时间
*/
case class Ratings(val uid:Int,
val mid:Int,
val score:Double,
val timestamp:Long)
/**
* Tags【 电 影 标 签 表
* @param uid 用户的 ID
* @param mid 电影的 ID
* @param tag 电影的标签内容
* @param timestamp 评分的时间
*/
case class Tags(val uid:Int,
val mid:Int,
val tag:String,
val timestamp:Long
)
/**
* 封装 MongoConfig配置
* @param uri MongoDB uri
* @param db 数据库名
*/
case class MongoConfig(val uri:String,val db:String)
/**
* ElasticSearch 配置对象
* @param httpHosts ES通过http连上去,主机名+端口
* @param transportHost ES集群内部通信端口
* @param index ES库名
* @param clusterName 集群名字
*/
case class ESConfig(val httpHosts:String,
val transportHost:String,
val index:String,
val clusterName:String)
case class Recommendation(mid: Int, r: Double)
case class GenresRecommendation(genres: String, recs: Seq[Recommendation])
主方法调用:StatisticsApp.scala
package Statistics
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**
* @author youngx
* @date 2021/4/19 15:48
*
* 从MongoDB中 读取数据,并按照需求进行写入数据
*/
object StatisticsApp extends App
val params = scala.collection.mutable.Map[String,Any]()
params += "spark.cores" -> "local[2]"
params += "mongo.uri" -> "mongodb://ip/recom"
params += "mongo.db" -> "recom"
val conf = new SparkConf().setAppName("Statistics Recommender").
setMaster(params("spark.cores").asInstanceOf[String])
val spark = SparkSession.builder().config(conf).getOrCreate()
//操作mongodb
implicit val mongoConf = new MongoConfig(params("mongo.uri").asInstanceOf[String], params("mongo.db").asInstanceOf[String])
import spark.implicits._
val ratings = spark.read.option("uri", mongoConf.uri)
.option("collection", "Ratings")
.format("com.mongodb.spark.sql")
.load().as[Ratings].cache()
val movies = spark.read.option("uri", mongoConf.uri)
.option("collection", "Movies")
.format("com.mongodb.spark.sql")
.load().as[Movies].cache()
//创建临时视图,方便 staticsRecommender类方法拿数据
movies.createOrReplaceTempView("Ratings")
/**
*统计评分最多的电影
*/
staticsRecommender.rateMost(spark)
/**
* 按评分类别统计平均分最高的 Top10
*/
staticsRecommender.genresAvgTop10(spark)(movies)
ratings.unpersist()
movies.unpersist()
spark.close()
算法需求:StatisticsAlgorithm.scala
package Statistics
import Statistics.StatisticsApp.mongoConf, spark
import org.apache.spark.sql.DataFrame, Dataset, SparkSession
/**
* @author youngx
* @date 2021/6/20 10:21
*/
object staticsRecommender
/**
*统计评分最多的电影
*/
def rateMost(spak:SparkSession)(implicit mongoConfig: MongoConfig): Unit =
val ratingPopularDF: DataFrame = spark.sql("select mid,count(1) from Ratings group by mid order by 2 desc")
ratingPopularDF.show()
//重新写入到mongodb中
ratingPopularDF.write
.option("uri",mongoConf.uri)
.option("collection","ratingPopular")
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
//按评分类别统计平均分最高的 Top10
def genresAvgTop10(spark: SparkSession)(movies:Dataset[Movies])(implicit mongoConfig: MongoConfig) =
//定义所有电影类别
val genres = List("Action","Adventure","Animation","Comedy","Ccrime","Documentary","Drama","Family","Fantasy","Foreign","History","Horror","Music","Mystery"
,"Romance","Science","Tv","Thriller","War","Western")
val averageMovieScoreDF = spark.sql("select mid,avg(score) as avg from ratings group by mid").cache()
val moviesWithAvgScoreDF = movies.join(averageMovieScoreDF,Seq("mid","mid")).select("mid","avg","genres")
val genresRDD = spark.sparkContext.makeRDD(genres)
import spark.implicits._
//笛卡尔积
val genresAvgTop10DF= genresRDD.cartesian(moviesWithAvgScoreDF.rdd).filter
case(genres,row) =>
row.getAs[String]("genres").contains(genres.toLowerCase())
.map
case(genres,row) =>
(genres,(row.getAs[Int]("mid"),row.getAs[Double]("avg")))
.groupByKey()
.map
case(genres,item)=>
GenresRecommendation(genres,item.toList.sortWith(_._2 > _._2).take(10).map(x => Recommendation(x._1,x._2)))
.toDF()
//每部电影平均分
averageMovieScoreDF
.write
.option("uri", mongoConfig.uri)
.option("collection", "AverageMoviesScore")
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
//按评分类别统计平均分最高的 Top10
genresAvgTop10DF
.write
.option("uri", mongoConfig.uri)
.option("collection", "genresTop10Movies")
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
以上是关于从MongoDB中 读取数据,并按照需求进行写入数据的主要内容,如果未能解决你的问题,请参考以下文章
MongoDB学习笔记-使用 MongoDB 进行 CRUD 操作(上)
MongoDB学习笔记-使用 MongoDB 进行 CRUD 操作(下)