2021-06-09*RDD编程初级实践

Posted 金豈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021-06-09*RDD编程初级实践相关的知识,希望对你有一定的参考价值。

RDD编程初级实践

一、数据来源描述

pyspark交互式编程
科任老师提供分析数据data.txt,该数据集包含了某大学计算机系的成绩,数据格式如下所示:
Tom,DataBase,80
Tom,Algorithm,50
Tom,DataStructure,60
Jim,DataBase,90
Jim,Algorithm,60
Jim,DataStructure,80
……

编写独立应用程序实现数据去重
对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。本文给出门课的成绩(A.txt、B.txt)下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20200101 x
20200102 y
20200103 x
20200104 y
20200105 z
20200106 z
输入文件B的样例如下:
20200101 y
20200102 y
20200103 x
20200104 z
20200105 y
根据输入的文件A和B合并得到的输出文件C的样例如下:
20200101 x
20200101 y
20200102 y
20200103 x
20200104 y
20200104 z
20200105 y
20200105 z
20200106 z

编写独立应用程序实现求平均值问题
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。本文给出门课的成绩(Algorithm.txt、Database.txt、Python.txt),下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)

二、数据上传及上传结果查看


图2.1 将所需的数据放进/usr/local/spark文件中

pyspark交互式编程

图2.2 查看data.txt中的数据

图2.3 data.txt中的数据

编写独立应用程序实现数据去重

图2.4 将所需的数据放进/usr/local/spark/big

图2.5 查看A.txt的数据

图2.6 查看B.txt的数据

编写独立应用程序实现求平均值问题

图2.7 查看相关数据

图2.8 查看Algorithm.txt的数据

图2.9 查看Database.txt的数据

图2.10 查看Python.txt的数据

三、数据处理过程描述

pyspark交互式编程
使用lines储存读取data.txt文件里的内容
lines=sc.textFile(‘file:///usr/local/spark/data.txt’)
(1) 该系总共有多少学生;
lines存储的是Row object类型
map()接收函数,把函数应用到RDD的每一个元素,返回新的RDD
首先提取第一列的数据,第一列的数据是学生的姓名,通过累加和去重可以算出总人数

res = lines.map(lambda x:x.split(,)).map(lambda x:x[0])

使用spark的distinct进行去重

res.distinct()

用sum存储去重后的数据

sum =res.distinct()

显示sum数值,count():计数

sum.count()


(2) 该系共开设了多少门课程;
lines存储的是Row object类型
map()接收函数,把函数应用到RDD的每一个元素,返回新的RDD
提取第二列的数据,第二列的数据是科目,通过累加和去重可以算出科目的总数

res = lines.map(lambda x:x.split(,)).map(lambda x:x[1])

用dis_res存去重后的数据

dis_res =res.distinct()

显示dis_res数值,count():计数

dis_res.count()


(3) Tom同学的总成绩平均分是多少;
lines存储的是Row object类型
map()接收函数,把函数应用到RDD的每一个元素,返回新的RDD
filter()接收函数,返回只包含满足filter()函数的元素的新RDD
筛选出第一列的数据,对RDD中的元素进行过滤选择名为Tom的列,返回形成新的RDD。

res = lines.map(lambda x:x.split(,)).filter(lambda x:x[] == ’Tom’)

使用foreach算子显示数据

res.foreach(print)

num保存Tom同学的学科总数

num=res.count()

提取各科成绩

score = rep.map(lambda x:int(x[2]))

各科成绩相加

sum_score = score.reduce(lambda x,y:x+y)

avg存储总成绩除以科目数的数值

avg = sum_score/num

显示平均分数值

print(avg)



(4) 求每名同学的选修的课程门数;
lines存储的是Row object类型
map()接收函数,把函数应用到RDD的每一个元素,返回新的RDD
提取出第二列的数据,并对第一列相同的数据进行累加,算出每位同学所选的选修课门数。

res = lines.map(lambda x:x.split(,)).map(lambda x:(x[0],1))

对相同key的数据进行处理,最终每个key只保留一条记录。

each_res=res.reduceByKey(lambda x,y:x+y)

使用foreach算子显示数据

each_res.foreach(print)


(5) 该系DataBase课程共有多少人选修;
lines存储的是Row object类型
map()接收函数,把函数应用到RDD的每一个元素,返回新的RDD
filter()接收函数,返回只包含满足filter()函数的元素的新RDD
筛选出第二列的数据,是对RDD中的元素进行过滤选择名为DataBase的列,返回形成新的RDD。

res = lines.map(lambda x:x.split(,)).filter(lambda x:x[1] == ’DataBase’)

显示res数值

res.count()


(6) 各门课程的平均分是多少;
lines存储的是Row object类型
map()接收函数,把函数应用到RDD的每一个元素,返回新的RDD
筛选出第二列和第三列的数据,并对数据进行过滤,累加,算出相关的数据之和,再进行平均分的算数。

res = lines.map(lambda x:x.split(,)).map(lambda x:(x[1],(int(x[2])),1))

对相同key的数据进行处理,最终每个key只保留一条记录,tem储存每科对应人数与每科总分

tem = res.reducByKey(ambda x,y(x[0]+y[0],x[1]+y[1]))

把数据以rdd的形式计算出每科平均分

avg = tem.map(lambda x(x[0],round(x[1][0]/x[1][1],2)))

使用foreach算子显示数据

avg.foreach(print)


(7) 使用累加器计算共有多少人选了DataBase这门课。
lines存储的是Row object类型
map()接收函数,把函数应用到RDD的每一个元素,返回新的RDD
filter()接收函数,返回只包含满足filter()函数的元素的新RDD
提取出第二列的数据,并对数据进行过滤,是对RDD中的元素进行过滤选择名为DataBase的列,返回形成新的RDD。

res = lines.map(lambda x:x.split(,)).filter(lambda x:x[1]==’DataBase’)

使用累加器

accum=sc.accumulator()

在foreach算子里使用累加器计算

res.foreach(lambda x:accum.add(1))

显示累加数据

accum.value


编写独立应用程序实现数据去重
进入cd/usr/local/spark/big查看相关文档是否存在
输入vim A.txt和vim B.txt,查看相关数据是否存在,再接着输入vim remdup.py,创建remdup.py文件,再接着对py文件进行编辑。
sc.textFile()读取本地文件,括号内输入文件的路径;
union()两个数据的并集

lines = lines1.union(lines2)

distinct()对相关数据进行去重,去重后将相关数据存到distinct_lines

distinct_lines=lines.distinct()

repartition(1)重新建立一个分区,并将生成的文件保存到C

res.repartition(1).saveAsTextFile(file:///usr/local/spark/big/C”)


编辑完成后输入:wq!保存退出后再输入python3 remdup.py运行文件,运行完成后,输入命令:ls,查看是否生成C文件夹;进入C文件夹,再输入命令:ls,查看结果是否生成。输入命令:vim part-00000,查看数据是否进行合并,并进行去重。


编写独立应用程序实现求平均值问题
输入cd /usr/local/spark进入目录,输入命令:vim avg.py,创立py文件,并对文件进行编辑。

SortBy()按照处理后的数据比较结果排序,默认为正序。
reduceByKey()把RDD中的key相同的一组数据拿出来处理,形成一个新的RDD里面放的是元组

编写完成后,输入wq!保存并退出,输入命令:ls,查看是否生成result文件夹;输入命令:cd result,进入result文件夹;输入命令:ls,查看数据是否生成;输入命令:vim part-00000,查看数据是否生成。

在Spark中,不同的RDD之间具有依赖的关系。RDD与它所依赖的RDD的依赖关系有两种类型,分别是窄依赖(narrow dependency)和宽依赖(wide dependency)。当RDD执行map、filter及union和join操作时,都会产生窄依赖。RDD做map、filter和union算子操作时,是属于窄依赖的第一类表现;而RDD做join算子操作(对输入进行协同划分)时,是属于窄依赖表现的第二类。当RDD做groupByKey和join操作时,会产生宽依赖。

以上是关于2021-06-09*RDD编程初级实践的主要内容,如果未能解决你的问题,请参考以下文章

2021-06-09*RDD编程初级实践

2021-06-09*RDD编程初级实践

RDD编程初级实践

RDD编程初级实践

RDD编程初级实践

RDD编程初级实践