从MongoDB中 读取数据,并按照需求进行写入数据

Posted youngxuebo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从MongoDB中 读取数据,并按照需求进行写入数据相关的知识,希望对你有一定的参考价值。

需求,从MongoDB中读取数据,完成下列需求
(1)、统计评分最多的电影
(2)、每部电影平均分
(3)、按评分类别统计平均分最高的 Top10


封装样本类:Model.scala

package Statistics

/**
  * @author youngx
  * @date 2021/4/19 14:34
  */

/**
  * Movie【 电 影 数 据 表 】
  * @param mid  电影的 ID
  * @param name 电影的名称
  * @param descri 电影的描述
  * @param timelong 电影的时长
  * @param issue 电影发布时间
  * @param shoot 电影拍摄时间
  * @param language 电影语言
  * @param genres 电影所属类别
  * @param actors 电影的演员
  * @param director 电影的导演
  */
case class Movies(val mid:Int,
                  val name:String,
                  val descri:String,
                  val timelong:String,
                  val issue:String,
                  val shoot:String,
                  val language:String,
                  val genres:String,
                  val actors:String,
                  val director:String
                 )

/**
  * Rating【 用 户 评 分 表 】
  * @param uid 用户的 ID
  * @param mid 电影的 ID
  * @param score 电影的分值
  * @param timestamp 评分的时间
  */
case class Ratings(val uid:Int,
                   val mid:Int,
                   val score:Double,
                   val timestamp:Long)

/**
  * Tags【 电 影 标 签 表
  * @param uid 用户的 ID
  * @param mid 电影的 ID
  * @param tag 电影的标签内容
  * @param timestamp 评分的时间
  */
case class Tags(val uid:Int,
                val mid:Int,
                val tag:String,
                val timestamp:Long
               )

/**
  * 封装 MongoConfig配置
  * @param uri  MongoDB uri
  * @param db  数据库名
  */
case class MongoConfig(val uri:String,val db:String)

/**
  * ElasticSearch 配置对象
  * @param httpHosts  ES通过http连上去,主机名+端口
  * @param transportHost ES集群内部通信端口
  * @param index ES库名
  * @param clusterName 集群名字
  */
case class ESConfig(val httpHosts:String,
                    val transportHost:String,
                    val index:String,
                    val clusterName:String)


case class Recommendation(mid: Int, r: Double)

case class GenresRecommendation(genres: String, recs: Seq[Recommendation])

主方法调用:StatisticsApp.scala

package Statistics

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
  * @author youngx
  * @date 2021/4/19 15:48
  *
  *       从MongoDB中 读取数据,并按照需求进行写入数据
  */
object StatisticsApp extends App {

  val params = scala.collection.mutable.Map[String,Any]()
  params += "spark.cores" -> "local[2]"
  params += "mongo.uri" -> "mongodb://ip/recom"
  params += "mongo.db" -> "recom"

  val conf = new SparkConf().setAppName("Statistics Recommender").
    setMaster(params("spark.cores").asInstanceOf[String])

  val spark = SparkSession.builder().config(conf).getOrCreate()

  //操作mongodb
  implicit val mongoConf = new MongoConfig(params("mongo.uri").asInstanceOf[String], params("mongo.db").asInstanceOf[String])


  import spark.implicits._

  val ratings = spark.read.option("uri", mongoConf.uri)
    .option("collection", "Ratings")
    .format("com.mongodb.spark.sql")
    .load().as[Ratings].cache()


  val movies = spark.read.option("uri", mongoConf.uri)
    .option("collection", "Movies")
    .format("com.mongodb.spark.sql")
    .load().as[Movies].cache()

  //创建临时视图,方便 staticsRecommender类方法拿数据
  movies.createOrReplaceTempView("Ratings")

  /**
    *统计评分最多的电影
    */
  staticsRecommender.rateMost(spark)


  /**
    * 按评分类别统计平均分最高的 Top10
    */
  staticsRecommender.genresAvgTop10(spark)(movies)


  ratings.unpersist()
  movies.unpersist()

  spark.close()
}

算法需求:StatisticsAlgorithm.scala

package Statistics

import Statistics.StatisticsApp.{mongoConf, spark}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * @author youngx
  * @date 2021/6/20 10:21
  */
object staticsRecommender {
  
  /**
    *统计评分最多的电影
    */
  def rateMost(spak:SparkSession)(implicit mongoConfig: MongoConfig): Unit ={

    val ratingPopularDF: DataFrame = spark.sql("select mid,count(1) from Ratings group by mid order by 2 desc")

    ratingPopularDF.show()

    //重新写入到mongodb中
    ratingPopularDF.write
      .option("uri",mongoConf.uri)
      .option("collection","ratingPopular")
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

  }

  //按评分类别统计平均分最高的 Top10
  def genresAvgTop10(spark: SparkSession)(movies:Dataset[Movies])(implicit mongoConfig: MongoConfig) = {

    //定义所有电影类别
    val genres = List("Action","Adventure","Animation","Comedy","Ccrime","Documentary","Drama","Family","Fantasy","Foreign","History","Horror","Music","Mystery"
      ,"Romance","Science","Tv","Thriller","War","Western")

    val averageMovieScoreDF = spark.sql("select mid,avg(score) as avg from ratings group by mid").cache()

    val moviesWithAvgScoreDF = movies.join(averageMovieScoreDF,Seq("mid","mid")).select("mid","avg","genres")

    val genresRDD = spark.sparkContext.makeRDD(genres)

    import spark.implicits._
    //笛卡尔积
    val genresAvgTop10DF= genresRDD.cartesian(moviesWithAvgScoreDF.rdd).filter{
      case(genres,row) =>{
        row.getAs[String]("genres").contains(genres.toLowerCase())
      }
    }.map{
      case(genres,row) =>{
        (genres,(row.getAs[Int]("mid"),row.getAs[Double]("avg")))
      }
    }.groupByKey()
      .map{
        case(genres,item)=>{
          GenresRecommendation(genres,item.toList.sortWith(_._2 > _._2).take(10).map(x => Recommendation(x._1,x._2)))
        }
      }.toDF()

    //每部电影平均分
    averageMovieScoreDF
      .write
      .option("uri", mongoConfig.uri)
      .option("collection", "AverageMoviesScore")
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    //按评分类别统计平均分最高的 Top10
    genresAvgTop10DF
      .write
      .option("uri", mongoConfig.uri)
      .option("collection", "genresTop10Movies")
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

  }
}

以上是关于从MongoDB中 读取数据,并按照需求进行写入数据的主要内容,如果未能解决你的问题,请参考以下文章

Mongodb复制集

MongoDB学习笔记-使用 MongoDB 进行 CRUD 操作(上)

MongoDB学习笔记-使用 MongoDB 进行 CRUD 操作(下)

MongoDB--关于数据库及选择MongoDB的原因

无法从使用 mongo spark 连接器读取的 spark DF 中显示/写入。

Kettle实现从mysql中取2张表数据关联的数据,并写入到mongodb中