spark数据清洗练习
Posted CarveStone
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark数据清洗练习相关的知识,希望对你有一定的参考价值。
文章目录
通过编写Spark程序清洗酒店数据里的缺失数据、非法数据、重复数据
准备工作
- 搭建 hadoop 伪分布或 hadoop 完全分布
- 上传 hotal_data.csv 文件到 hadoop
- idea 配置好 scala 环境
删除缺失值 >= 3 的数据
- 读取 /hotel_data.csv
- 删除缺失值 >= 3 的数据, 打印剔除的数量
- 将清洗后的数据保存为/hotelsparktask1
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext
object Demo01
def main(args: Array[String]): Unit =
// System.setProperty("HADOOP_USER_NAME", "root")//解决保存文件权限不够的问题
val config: SparkConf = new SparkConf().setMaster("local[1]").setAppName("1")
val sc = new SparkContext(config)
val hdfsUrl ="hdfs://192.168.226.129:9000"
val filePath: String = hdfsUrl+"/file3_1/hotel_data.csv"
val data: RDD[Array[String]] = sc.textFile(filePath).map(_.split(",")).cache()
val total: Long = data.count()
val dataDrop: RDD[Array[String]] = data.filter(_.count(_.equals("NULL")) <= 3)
println("删除的数据条目有: " + (total - dataDrop.count()))
dataDrop.map(_.mkString(",")).saveAsTextFile(hdfsUrl+ "/hotelsparktask1")
sc.stop()
删除星级、评论数、评分中任意字段为空的数据
- 读取 /hotel_data.csv
- 将字段星级、评论数、评分中任意字段为空的数据删除, 打印剔除的数量
- 保存 /hotelsparktask2
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext
object Demo02
def main(args: Array[String]): Unit =
System.setProperty("HADOOP_USER_NAME", "root")
val config: SparkConf = new SparkConf().setMaster("local[1]").setAppName("2")
val sc = new SparkContext(config)
val hdfsUrl ="hdfs://192.168.226.129:9000"
val filePath: String = hdfsUrl+"/file3_1/hotel_data.csv"
val data: RDD[Array[String]] = sc.textFile(filePath).map(_.split(",")).cache()
val total: Long = data.count()
val dataDrop: RDD[Array[String]] = data.filter
arr: Array[String] =>
!(arr(6).equals("NULL") || arr(10).equals("NULL") || arr(11).equals("NULL"))
println("删除的数据条目有: " + (total - dataDrop.count()))
dataDrop.map(_.mkString(",")).saveAsTextFile(hdfsUrl+ "/hotelsparktask2")
sc.stop()
删除非法数据
- 读取第一题的 /hotelsparktask1
- 剔除数据集中评分和星级字段的非法数据,合法数据是评分[0,5]的实数,星级是指星级字段内容中包含 NULL、二星、三星、四星、五星的数据
- 剔除数据集中的重复数据
- 分别打印 删除含有非法评分、星级以及重复的数据条目数
- 保存 /hotelsparktask3
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkConf, SparkContext
object Demo03
def main(args: Array[String]): Unit =
System.setProperty("HADOOP_USER_NAME", "root")//解决权限问题
val config: SparkConf = new SparkConf().setMaster( "local[1]").setAppName("3")
val sc = new SparkContext(config)
val hdfsUrl ="hdfs://192.168.226.129:9000"
val filePath: String = hdfsUrl+"/hotelsparktask1"
val lines: RDD[String] = sc.textFile(filePath).cache()
val data: RDD[Array[String]] = lines.map(_.split(","))
val total: Long = data.count()
val dataDrop: RDD[Array[String]] = data.filter
arr: Array[String] =>
try
(arr(10).toDouble >= 0) && (arr(10).toDouble <= 5)
catch
case _: Exception => false
val lab = Array("NULL", "一星", "二星", "三星", "四星", "五星")
val dataDrop1: RDD[Array[String]] = data.filter arr: Array[String] =>
var flag = false
for (elem <- lab)
if (arr(6).contains(elem))
flag = true
flag
val dataDrop2: RDD[String] = lines.distinct
println("删除的非法评分数据条目有: " + (total - dataDrop.count()))
println("删除的非法星级数据条目有: " + (total - dataDrop1.count()))
println("删除重复数据条目有: " + (total - dataDrop2.count()))
val wordsRdd: RDD[Array[String]] = lines.distinct.map(_.split(",")).filter
arr: Array[String] =>
try
(arr(10).toDouble >= 0) && (arr(10).toDouble <= 5)
catch
case _: Exception => false
.filter arr: Array[String] =>
var flag = false
for (elem <- lab)
if (arr(6).contains(elem))
flag = true
flag
wordsRdd.map(_.mkString(","))
.saveAsTextFile(hdfsUrl + "/hotelsparktask3")
sc.stop()
hotel_data.csv
下载数据:https://download.csdn.net/download/weixin_44018458/87437211
以上是关于spark数据清洗练习的主要内容,如果未能解决你的问题,请参考以下文章