Spark之RDD的使用(pyspark版)

Posted 柳小葱

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark之RDD的使用(pyspark版)相关的知识,希望对你有一定的参考价值。

💫上次写完rdd的介绍,有同学强烈介意用一些代码来展示一下rdd,好今天我们就如你所愿,我们今天就来以代码的方式给大家讲解一下rdd吧,对以往内容感兴趣的同学可以查看下面👇:

💐今天主要讲解一下rdd的大致情况,以及目前的使用场景,然后就是掩饰怎么使用python操作rdd的几种方式。

1. 低级API——RDD

其实对于初学者,我不会很建议你从这部分开始,因为这部分理解起来很困难,还不如从高级的api开始学,之所以说rdd低级,是因为我们在几乎所有的场景都能应该使用结构化的api,只有当我们遇到一些很不常见的功能你才会使用rdd(当然还有其他的场景),低级的api有两种:

  • rdd 弹性分布式数据集
  • 广播变量和累加器 分发和处理分布式的共享变量。

rdd在spark的1.X版本是主要的api,在2.X版本仍然可以使用,但不常用,我们目前使用的是3.X版本,几乎不用原始的rdd了。但rdd的概念是一直存在的,无论是我们的dataframe和dataset(scala),运行所有的spark代码都将编译成rdd。
rdd虽然有很强大的可塑性,可是在完成一些操作时的优化远远没有结构化的api好,所以说结构化的api更高效。

2. 创建RDD

低级api使用spark.sparkContext来调用即可。但我们这里讲解几种创建方式。

2.1 使用高级api转换

这里的高级api主要是指利用DataFrame和Dataset来创建rdd。

  • 使用dataframe创建rdd
#使用dataframe创建rdd
a=spark.range(10).rdd
a.collect()

结果如下:

  • 使用rdd创建dataframe
#使用rdd创建dataframe
b=spark.range(10).rdd.toDF()
b.show()

结果如下:

2.2 从本地创建rdd

从集合中创建rdd需要使用sparkContext中的parallelize方法,该方法会将位于单个节点的数据集合转换成一个并行集合。在创建该并行集合时,还可以显示指定并行集合的分片集合。下面我以2个分片作为例子:

#创建单词rdd
c='hello spark , hello hadoop'.split(" ")
words=spark.sparkContext.parallelize(c,2)
words.collect()

结果如下:

2.3 从数据源创建

从数据源或者文本文件中都能创建rdd,但最好还是采用结构化的方式来读取数据。

#从文本中读取数据
d=spark.sparkContext.textFile("/FileStore/tables/2010_12_01.csv")
d.collect()

结果如下:

3. 操作RDD

转换操作

3.01 dinstinct

在rdd上调用distinct方法用于删除rdd中的重复项

c='hello spark , hello hadoop'.split(" ")
words=spark.sparkContext.parallelize(c,2)
words.distinct().count()

结果如下:

3.02 filter

过滤器filter类似于sql中的where子句。

#构建一个过滤条件h开头
def startwith(ones):
   return ones.startswith("h")
#filter过滤
words.filter(lambda x:startwith(x)).collect()

结果如下:

3.03 map

map操作,指定一个函数,将给定的数据一条一条地输入该函数处理以得到你期望的结果,一对一是map最大的特点。

#将单个字符变成一个三元组
words.map(lambda word : (word,word[0],word.startswith('h'))).collect()

结果如下:

3.04 flatmap

flatmap函数是对map函数的拓展,当前行会映射为多行。该函数最大的特点就是一行映射多行。

#将一个一单词拆成一个个字符
words.flatMap(lambda a:list(a)).take(10)

结果如下:

3.05 sortBy

排序操作

#将单词按照字符长度从短到长排列
words.sortBy(lambda a:len(a)).take(4)

结果如下:

动作操作

3.06 reduce

该函数指定一个函数将rdd中的任何类型的值“规约”成一个值

#求1-20数字之和(这里实现的方式,是通过22相加最后求的总和)
spark.sparkContext.parallelize(range(1,21)).reduce(lambda x,y:x+y)

结果如下:

3.07 count

用与统计元素个数

#统计单词个数
words.count()

结果如下:

3.08 countByValue

根据value值进行计数

words.countByValue()

结果如下:

3.09 first

返回数据集中第一个值

#返回数据集中第一个值
words.first()

结果如下:

3.10 max 和 mix

max和min方法分别返回最大值和最小值

#最大值
spark.sparkContext.parallelize(range(1,11)).max()

结果如下:

#最小值
spark.sparkContext.parallelize(range(1,11)).min()

结果如下:

3.11 take

take 和它的派生方法是从rdd中获取一定数据的值

words.take(3) 

4.参考文献

《spark权威指南》
《pysaprk教程》
《pyspark实战》

以上是关于Spark之RDD的使用(pyspark版)的主要内容,如果未能解决你的问题,请参考以下文章

PySpark之RDD操作

Spark 定制版:008~Spark Streaming源码解读之RDD生成全生命周期彻底研究和思考

在 python (pyspark) 中使用 combinebykey spark rdd 计算组上的聚合

将 RDD 的值作为变量传递给另一个 RDD - Spark #Pyspark [重复]

Spark(PySpark)如何同步多个worker节点更新RDD

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问