spark数据分析练习
Posted CarveStone
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark数据分析练习相关的知识,希望对你有一定的参考价值。
文章目录
spark数据分析练习
- 新数据集中需 城市名称、城市总订单、城市酒店平均用户评分、城市酒店总评论数
- 分别计算以下三个字段的最大值和最小值(城市总订单、城市酒店平均用户评分、城市酒店总评论数)打印输出
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext
import scala.collection.mutable.ArrayBuffer
object Demo04
def main(args: Array[String]): Unit =
System.setProperty("HADOOP_USER_NAME", "root")
val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("4")
val sc = new SparkContext(conf)
val hdfsUrl = "hdfs://192.168.226.129:9000"
val filePath: String = hdfsUrl + "/file3_1/hotel_data.csv"
val data: RDD[Array[String]] = sc.textFile(filePath).map(_.split(",")).cache()
val total: Long = data.count()
val dataDrop: RDD[(String, Double, Double)] = data.map arr: Array[String] =>
Array(arr(4), arr(10), arr(11)) // 城市,评分,评论数
.filter
arr: Array[String] =>
try
arr(1).toDouble
arr(2).toDouble
true
catch
case e: Exception => false
.map arr: Array[String] =>
(arr(0), (arr(0), arr(1).toDouble, arr(2).toDouble))
.reduceByKey (tup1: (String, Double, Double), tup2: (String, Double, Double)) =>
(tup1._1, tup1._2 + tup2._2, tup1._3 + tup2._3)
.map(_._2)
val dpCount: Long = dataDrop.count()
val data1: RDD[(String, Double, Double, Double)] = dataDrop.map
tup: (String, Double, Double) =>
(tup._1, dpCount.toDouble, tup._2 / dpCount, tup._3.toDouble)
val city = Array("广州", "北京", "上海", "阿拉善盟")
data1.filter arr: (String, Double, Double, Double) =>
city.contains(arr._1)
//城市总订单,平均分,总评论数
data1.map tup: (String, Double, Double, Double) =>
val bf: ArrayBuffer[String] = ArrayBuffer[String]()
tup.productIterator.foreach e: Any =>
bf += e.toString
bf.toArray
.map(_.mkString(","))
.saveAsTextFile(hdfsUrl + "/hotelsparktask4_1")
val arrMM = Array(Array(data1.map(_._2).max(), data1.map(_._2).min),
Array(data1.map(_._3).max(), data1.map(_._3).min()),
Array(data1.map(_._4).max(), data1.map(_._4).min())
)
println("城市总订单(最大,最小):" + arrMM(0).mkString(","))
println("城市酒店平均用户评分(最大,最小):" + arrMM(1).mkString(","))
println("城市酒店总评论数(最大,最小):" + arrMM(2).mkString(","))
sc.parallelize(arrMM).map(_.mkString(","))
.saveAsTextFile(hdfsUrl + "/hotelsparktask4_2")
//城市,总订单,平均分,总评论数
val rdd1 = sc.textFile(hdfsUrl + "/hotelsparktask4_1")
.map(_.split(","))
.map arr => arr(0) -> Array(arr(1), arr(2), arr(3)).map(_.toDouble)
.map tup =>
val diff = arrMM.map(_.reduce(_ - _)).map e =>
if (e == 0)
1
else
e
val dd = (tup._2(0) - arrMM(0)(1)) / diff(0)
val pf = (tup._2(1) - arrMM(1)(1)) / diff(1)
val pl = (tup._2(2) - arrMM(2)(1)) / diff(2)
Array(tup._1,dd,pf,pl)
val reMap = rdd1.groupBy(_(0)).collectAsMap()
println(reMap.getOrElse("广州","NaN"))
println(reMap.getOrElse("上海","NaN"))
println(reMap.getOrElse("北京","NaN"))
rdd1.map(_.mkString(","))
.saveAsTextFile(hdfsUrl+"/hotelsparktask4_3")
sc.stop()
hotel_data.csv
下载数据:https://download.csdn.net/download/weixin_44018458/87437211
spark数据清洗练习:https://blog.csdn.net/weixin_44018458/article/details/128980802
以上是关于spark数据分析练习的主要内容,如果未能解决你的问题,请参考以下文章