《SPARK官方教程系列》(标贝科技)

Posted DataBaker标贝科技

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《SPARK官方教程系列》(标贝科技)相关的知识,希望对你有一定的参考价值。

1-概述

Apache Spark是一个用于大规模数据处理的统一分析引擎,它在Java、Scala、Python和R中的提供了高级api,以及一个支持通用执行图[general execution graphs]的优化引擎。它还支持一组丰富的高级工具,包括用于SQL和结构化数据处理的Spark SQL、用于机器学习的MLlib、用于图形处理的GraphX以及用于增量计算和流处理的结构化流。

在Spark 2.0之前,Spark的主要编程接口是弹性分布式数据集(RDD)。在Spark 2.0之后,RDDs被Dataset/DateFrame取代,它是像RDD一样的强类型,但在内部进行了更丰富的优化。RDD接口仍然受到支持,您可以在RDD编程指南中获得更详细的参考资料。但是,强烈建议您切换到使用Dataset/DataFrame,它比RDD具有更好的性能。有关Dataset/DataFrame的更多信息,请参阅SQL编程指南。

默认情况下,Spark中的安全性是关闭的。这可能意味着您在默认情况下容易受到攻击。运行Spark前请参阅Spark安全。

组件【Components】

Spark应用【applications】在集群中作为独立的一组进程集运行,由主程序中的SparkContext对象进行协调,主程序被称为driver program,即驱动程序。

Spark应用是在集群中运行的,SparkContext可以连接几种类型的集群管理器【cluster managers】,比如spark自己的集群管理器standalone,Mesos,YARN,K8S等。集群管理器可以跨应用程序分配资源。一旦连接上集群管理器后,Spark在集群的节点上获得执行器【executors】,执行器是运行计算和存储数据的进程。下一步,Spark发送你的应用代码(在jar包或python文件中定义的,传递给SparkContext),最后SparkContext把任务【tasks】发送到执行器中运行。

关于这个架构有几点需要注意:
每个应用程序都有自己的执行器进程。执行器在整个应用程序期间保持运行,并在多个线程中运行tasks。这样做的好处是可以将应用程序彼此隔离,不管是在调度方面(每个driver调度它自己的tasks)还是执行器方面(不同应用的tasks运行在不同的JVM)。这也意味着如果不将数据写入外部存储系统,就不能在不同的Spark应用(SparkContext实例)中共享数据。
Spark与底层集群管理器无关,只要集群管理器能够获得executor,且executor进程之间相互通信,即使在支持其他应用程序(如Mesos/YARN)的集群管理器上运行也是相对容易的。
驱动程序必须在其整个生命周期中侦听并接受来自执行器的连接(参考spark.driver.port in the network config section)。因此驱动程序必须能被工作结点【worker nodes】访问到。
因为驱动程序在集群上调度任务,它应该在靠近工作节点的地方运行,最好在同一局域网络上。如果想远程发送请求到集群,最好是打开一个RPC,让驱动程序从附近提交操作,而不是在离工作节点很远的地方运行驱动程序。

集群管理器类型

Spark目前支持下面几个集群管理器:
Standalone - spark自带的一个简单的集群管理器,可以很容易的部署。
Apache Mesos - 一个通用的集群管理器,此管理器也可以运行Hadoop MapReduce和服务应用。
Hadoop YARN - Hadoop2的资源管理器
Kubernetes - 用于自动化部署、扩展和管理容器化应用程序的开源系统

提交应用程序【Submitting Applications】

使用spark-submit脚本可以把应用提交到集群,详情参考应用提交指南。

监控

每个驱动程序【driver program】都有一个Web UI,默认使用4040端口,界面会展示一些信息包括正在运行的tasks,executors,storage usage。在浏览器打开 http://:4040访问此监控界面,详情参考监控指南。

Job Scheduling

Spark提供了跨应用程序(在集群管理器级别)和应用程序内部(同一个SparkContext上发生多个计算)的资源分配的控制。详情参考Job调度指南。

术语

下表总结了集群概念的术语:

TermMeaning
Application在spark上构建的应用程序,包含driver program和在集群上的executors。
Application jar包含用户的Spark应用程序的jar, 在某些情况下,用户希望创建一个“uber jar”,其中包含他们的应用程序及其依赖项。用户的jar不应该包含Hadoop或Spark库,这些库将在运行时自动添加。
Driver program驱动程序:运行main函数和创建SparkContext的程序。
Cluster manager集群管理器:获取集群上资源的外部服务 (e.g. standalone manager, Mesos, YARN)
Deploy mode部署模式:区分驱动程序运行的位置. “cluster” 模式在集群内部启动驱动程序,"client"模式,在集群外部启动驱动程序。
Worker node工作节点:集群中可以运行应用的节点。
Executor执行器:在工作节点上为应用程序启动的进程,用于运行tasks并跨task在内存或磁盘存储中保存数据,每个应用都有自己的执行器。
Task任务:发送到执行器上的工作单元
Job由多个任务组成的并行计算,这些Job在响应Spark action时产生,比如save或collect等。
Stage每个作业被分成更小的任务集,称为阶段,Stage相互依赖(类似于MapRerduce中的Map和Reduce)

2-RDD编程指南

概述

从高层次上讲,每个Spark应用程序都由一个驱动程序【driver progarm】和若干执行器【executors】组成,驱动程序运行用户的main方法,执行器集群上并行执行各种操作。

Spark提供的主要抽象是弹性分布式数据集resilient distributed dataset。它是跨集群节点划分的元素集合,可以并行操作。RDDs通过从Hadoop文件系统(或任何其他Hadoop支持的文件系统)中的文件创建,或者通过驱动程序中的现有集合创建,并可以对其进行转换。用户还可以要求Spark将RDD持久化到内存中,从而允许在并行操作中有效地重用它。RDDs还会自动从节点故障中恢复。

Spark中的第二个抽象是可在并行操作中使用的共享变量。默认情况下,当Spark在不同节点上作为一组任务[tasks]并行运行一个函数时,它会将函数中使用的每个变量的副本发送给每个任务。有时候,一个变量需要在任务[tasks]之间共享,或者在任务[tasks]和驱动程序[driver program]之间共享。Spark支持两种类型的共享变量:
广播变量【broadcast variables】,可以在所有节点上的内存中缓存一个值。
累加器【accumulators】,变量只会被累加,比如计数器和汇总求和等。

连接到spark

Spark 3.0.0适用于Python 2.7+或Python 3.4+。它可以使用标准的CPython解释器,所以可以使用像NumPy这样的C库。它也适用于pypy2.3 +。
注意,从Spark 3.0.0开始不再支持Python 2。
Python中的Spark应用程序可以通过bin/ Spark -submit脚本运行,该脚本在运行时包含Spark,也可以把它包含在setup.py中:

install_requires=[
        'pyspark==site.SPARK_VERSION'
    ]

在没有使用pip安装pySpark的情况下,要在python中运行Spark应用,使用位于Spark目录中的bin/ Spark -submit脚本。这个脚本将加载Spark的Java/Scala库,并允许您向集群提交应用程序。还可以使用bin/pyspark启动交互式Python shell。

如果希望访问HDFS数据,则需要使用连接到你的HDFS版本的PySpark。对于常用的HDFS版本,Spark主页上也有预先构建的包。
最后,需要将一些Spark类导入到程序中。添加以下行

from pyspark import SparkContext, SparkConf

PySpark在driver和worker中都需要使用相同的Python版本。它默认使用PATH中python版本,你可以通过PYSPARK_PYTHON指定你想使用的python版本,例如:

$ PYSPARK_PYTHON=python3.4 bin/pyspark
$ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py

初始化spark

Spark程序必须做的第一件事是创建SparkContext对象,该对象告诉Spark如何访问集群。要创建SparkContext,首先需要构建一个SparkConf对象,该对象包含有关应用程序的信息。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

appName参数是用在集群UI上显示应
用程序的名称。master是一个Spark、Mesos或YARN集群URL,或者是一个特殊“local”字符串用于在本地模式下运行。实际上,在集群上运行时,不建议在程序中硬编码master,而是使用spark-submit启动应用程序时通过参数传递。对于本地测试和单元测试,可以通过“local”在同进程内来运行Spark。

使用Shell

在PySpark shell中,已经为您创建了一个特殊的支持解释器的SparkContext,存在名为sc的变量中。不能创建自己的SparkContext。可以使用–master参数设置连接到哪一个master,通过向–py-files参数传递一个逗号分隔的列表,可以将Python .zip、.egg或.py文件添加到运行时路径中。还可以通过向–packages参数提供一个以逗号分隔的Maven coordinates列表,将依赖项(例如Spark Packages)添加到shell会话中。任何可能存在依赖关系的附加存储库(例如Sonatype)都可以传递给–repositories参数。

例如,要在四个核上运行bin/pyspark,请使用:
$ ./bin/pyspark --master local[4]

或者,也可以将code.py添加到搜索路径中(为了在代码中能够import),使用:
$ ./bin/pyspark --master local[4] --py-files code.py

要获得选项的完整列表,请运行pyspark --help。在后台,pyspark调用更通用的 spark-submit脚本。

弹性分布式数据集(RDDs)

Spark围绕着弹性分布式数据集(RDD)的概念展开,RDD是可并行操作的元素的容错集合。
创建RDDs有两种方法:在driver程序中并行化【parallelizing】现有的集合,或者引用外部存储系统中的数据集,例如一个共享的文件系统、HDFS、HBase或支持Hadoop InputFormat的任何数据源。

并行化的集合【Parallelized Collections】

在driver中,通过在现有的可迭代对象或集合上调用SparkContext的parallelize方法创建并行化集合。将集合的元素复制到可并行操作的分布式dataset。下面是如何创建一个包含数字1到5的并行集合:

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

一旦创建,就可以并行操作分布式数据集(distData)。例如,我们可以调用distData.reduce(lambda a, b: a + b)将列表中的元素相加。稍后我们将讨论rdd上的操作。

并行集合的一个重要参数是将数据集切割的分区数。Spark将为集群的每个分区运行一个任务。通常,Spark会尝试根据您的集群自动设置分区数量,也可以手动设置,传递第二个参数给parallelize方法,例如sc.parallelize(data, 10)。

外部数据集【External Datasets】

PySpark可以从Hadoop支持的任何存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。Spark支持text files, SequenceFiles和任何其他Hadoop InputFormat。

使用SparkContext的textFile方法创建文本文件的RDDs。这个方法接受文件的URI(计算机上的本地路径,或者hdfs://、s3a://等URI),并将其按行读取为一个集合。下面是一个调用示例:

distFile = sc.textFile("data.txt")

可以通过数据集的一些方法操作distFile。例如我们可以使用map和reduce统计所有行的字符长度:distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)。

读取文件的一些注意事项:
如果使用本地文件系统上的路径,那么该文件也必须可以在工作节点上的同一路径上访问。将文件复制到所有worker,或者使用网络挂载的共享文件系统。
Spark的所有基于文件的输入方法,包括textFile,都支持在目录、压缩文件和通配符上运行。例如,你可以使用textFile("/my/directory"), textFile("/my/directory/.txt"),和textFile("/my/directory/.gz")。
textFile方法还接受一个可选的第二个参数,用于控制文件的分区数量。默认情况下,Spark为文件的每个块【block】创建一个分区(HDFS中的块默认为128MB)。也可以通过传递较大的值来要求更高的分区数。注意,分区数不能少于块数。

除了文本文件,Spark的Python API还支持几种其他数据格式:
SparkContext.wholeTextFiles允许读取包含多个小文本文件的目录,并以(filename, content)键值对的形式返回每个文件。这与textFile相反,后者将在文件中每行返回一条记录。
RDD.saveAsPickleFile和SparkContext.pickleFile支持将RDD保存为使用picke序列化过的python对象。
SequenceFile and Hadoop Input/Output Formats

RDD操作

RDDs支持两种类型的操作:
transformations:转换现有rdd创建新的rdd
actions:在rdd上运行计算后返回一个值给驱动程序【driver program】
例如,map是一种transformation,它将每个rdd元素传递到一个函数,并返回一个新RDD。reduce是一种action,使用某个函数聚合RDD的所有元素并将最终结果返回给驱动程序【driver program】

Spark中的所有transformation都是惰性的,它们不会立即计算结果。它们只记录转换操作。只有当一个action需要将结果返回到driver时,才计算transformation。这种设计使Spark运行效率更高。例如通过map创建的数据集将用于reduce,并且只将reduce的结果返回给驱动程序,而不是更大的maped dataset。

默认情况下,每次在每个转换后的RDD上运行action时,都可能重新计算transformation,但是可以使用persist(或cache)方法将RDD持久化到内存中,在这种情况下,Spark将保留集群中的元素,以便下次查询时更快地访问它。还支持在磁盘上持久化RDDs,或者跨多个节点复制RDDs。

基本操作

先看下面简单的示例:

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

第一行从外部文件定义了一个基本RDD,该dataset不会加载到内存中,也不会有其他操作:lines只是一个指向文件的指针。第二行定义了lineLengths作为map transformation的结果,同样,lineLengths由于惰性也不是马上执行计算。最后,我们运行reduce,这是一个action。在这个时间点上,Spark将计算分解为任务,在不同的机器上运行,每台机器同时运行自己的map部分和本地reduction,只向驱动程序返回结果。

如果我们以后还想再次使用lineLengths,我们可以在reduce之前添加:

lineLengths.persist()

这会导致lineLengths在第一次计算后被保存在内存中。

传递函数

Spark的API严重依赖于在集群上运行的在驱动程序中传递的函数。有三种推荐的方法来做到这一点:
lambda表达式,对于简单的函数可以写成一个表达式,lambda表达式不支持多行语句或不返回值的语句。
Local defs inside the function calling into Spark, for longer code.
模块中的顶级函数。

例如,要传递一个比lambda支持的函数更长的函数,考虑下面的代码:

"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

也可以传递一个类实例中的方法的引用,这需要发送包含该方法的类对象【this requires sending the object that contains that class along with the method.】。示例:

class MyClass(object):
    def func(self, s):
        return s
    def doStuff(self, rdd):
        return rdd.map(self.func)

这里,如果我们创建一个新的MyClass类,并调用它的doStuff方法,doStuff内的map引用MyClass实例的func方法,因此需要将整个对象发送到集群。
同样,访问外部对象的字段也会引用整个对象:

class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

为了避免这个问题,最简单的方法是将字段复制到局部变量中,而不是从外部访问它:

def doStuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)

理解闭包

关于Spark的难点之一是在跨集群执行代码时理解变量和方法的作用域和生命周期。在其作用域之外修改变量的RDD操作经常会造成混淆。在下面的示例中,我们看一下使用foreach()递增计数器的代码,其他操作也可能出现类似的问题。

示例

考虑下面天真的RDD元素汇总,它的行为可能会因执行是否在同一个JVM中发生而有所不同:在local模式下运行Spark(–master = local[n])与在集群中部署Spark应用程序(例如,通过spark-submit to YARN)

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)

本地模式与集群模式(Local vs. cluster modes)

上述代码的行为不明确,可能无法按预期工作。为了执行jobs,Spark将RDD操作的处理分解为task,执行器【executor】执行并行执行每一个任务。在执行之前,Spark计算任务的闭包。闭包是那些executor在RDD上执行计算时必须可见的变量和方法。这个闭包被序列化并发送给每个executor。

发送给每个执行器的闭包中的变量都是副本,因此,当counter在foreach函数中被引用时,它不再是驱动节点上的counter。driver的内存中仍然有一个counter,但它对executors不可见。executor只能看到来自序列化闭包的副本。因此,counter的最终值仍然是零。

在本地模式下,在某些情况下,foreach函数实际上将在与驱动程序相同的JVM中执行,并引用相同的原始counter,并更新它。

为了确保在这类场景中定义良好的行为,应该使用累加器【Accumulator】。Spark中的累加器专门用于提供一种机制,在集群中跨工作节点执行时安全地更新变量,在本指南累加器部分有更详细地讨论。

一般来说,闭包——构造类似于循环或局部定义的方法——不应该用于改变某些全局状态。一些代码可能在本地模式下能很好的运行,但这只是偶然的,而且这些代码在分布式模式下不会按预期的那样工作。如果需要进行全局聚合,请使用累加器。

打印RDD的元素

另一种常见的习惯用法是尝试使用rdd.foreach(println)或rdd.map(println)打印出RDD的元素。在一台机器上,这将生成预期的输出并打印所有RDD元素。但是,在集群模式中,stdout改为executor的stdout,不是driver上的那个,所以driver的stdout不会显示这些。要在驱动器上打印所有元素,可以使用collect()方法先将RDD带到驱动器节点:rdd.collect().foreach(println)。但是,这可能会导致驱动器耗尽内存,因为collect()将整个RDD提取到一台机器上。如果只需要打印RDD的一些元素,更安全的方法是使用take(): rdd.take(100).foreach(println)。

使用键-值对

虽然大多数RDD操作都包含任意类型对象,但少数特殊操作仅在键-值对的RDDs上可用。最常见的是分布式“洗牌【shuffle】”操作,比如通过一个键对元素进行分组或聚合。在Python中,这些操作在包含元组的RDDs上运行。只需创建这样的元组,然后调用所需的操作。

下面的代码使用键-值对操作reduceByKey来计算文件中每行文本出现的次数:

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

我们还可以使用counts.sortByKey()按字母顺序对它们进行排序,最后使用counts.collect()将它们作为列表返回驱动器。

转换[Transformations]

下表列出了Spark支持的一些常见transformation。详细信息请参考RDD API文档

TransformationMeaning
map(func)Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func)Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.
mapPartitionsWithIndex(func)Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T.
sample(withReplacement, fraction, seed)随机采样,第一个参数:boolean类型,表示产生的样本是否可以重复,第二个参数:代表取样的比例,第三个参数:代表一个随机数种子,就是抽样算法的初始值
union(otherDataset)返回一个新RDD,其中包含源数据集中的元素和参数的并集
intersection(otherDataset)返回一个新RDD,其中包含源数据集中的元素和参数的交集。
distinct([numPartitions]))返回一个新RDD,其中包含源数据集去重生的数据。
groupByKey([numPartitions])When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.Note: 如果要对每个键进行分组以执行聚合(比如求和或平均),使用reduceByKey or aggregateByKey 会有更好的性能。Note: 默认情况下,输出中的并行级别取决于父RDD的分区数量。可以传递一个可选的numPartitions参数来设置不同的任务数量。
reduceByKey(func, [numPartitions])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numPartitions])When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numPartitions])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numPartitions])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.按照k合并v
cartesian(otherDataset)When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).对两个 RDD 做笛卡尔集
pipe(command, [envVars])Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner)Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

动作[Actions]

下表列出了Spark支持的一些常见action。详细信息请参考RDD API文档。

ActionMeanin
reduce(func)Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect()Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset (similar to take(1)).
take(n)Return an array with the first n elements of the dataset.
takeSample(withReplacement, num, [seed])Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering])Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path)(Java and Scala)Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path)(Java and Scala)Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey()Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

Spark RDD API还公开了一些action的异步版本,比如foreach的foreachAsync,它会立即向调用者返回一个FutureAction,而不是在动作执行时阻塞。这可以用于管理或等待异步执行。

洗牌[Shuffle]操作

Spark中的某些操作会触发称为shuffle的事件。shuffle是Spark用于重新分发数据的机制,以便在不同分区之间以不同的方式分组。这通常涉及到在executors和machines之间复制数据,使得shuffle成为一个复杂而昂贵的操作。

背景

为了理解在shuffle过程中发生了什么,我们使用reduceByKey操作作为示例。reduceByKey操作生成一个新的RDD,其元素值是一个元组——key和该key关联的所有值在reduce 函数上的执行结果。问题在于,并非单个key的所有值都留在同一分区,甚至同一台机器上,但是要计算结果,它们必须位于同一位置。

在spark中,特定的操作需要数据不跨分区分布。在计算期间,一个分区上只执行一个task。为了reduceByKey的reduce能够执行,需要组织所有数据执行all-to-all的操作,从所有分区读取所有key的值,然后将跨分区的值集合到一个分区里再计算单个key的结果,这个过程叫作shuffle。

尽管洗牌后每个分区新的数据集是确定的,分区本身的顺序也是确定的,但是分区内的元素的顺序不是。如果希望洗牌后的数据是有序的,可以使用以下方法:

  • mapPartitions:在每个分区内使用排序方法,比如.sort()
  • repartitionAndSortWithinPartitions:在重新分区的同时有效地对分区内数据进行排序
  • sortBy:产生一个全局有序的RDD

可以引起洗牌的操作包括:

  • 分区操作:例如repartion和coalesce
  • 包含"ByKey"的操作,例如groupByKey,reduceByKey
  • 连接操作:例如cogroup和join

性能影响

洗牌是一个昂贵的操作,代价有点高,因为它涉及了磁盘I/O,数据序列化和网络I/O。为了准备 shuffle 操作的数据,Spark 启动了一系列的任务,map 任务组织数据,reduce 完成数据的聚合。这些术语来自 MapReduce,跟 Spark 的map 操作和reduce 操作没有关系。

在内部,单个map任务产生的数据缓存在内存,当内存不足时,这些数据基于目标分区排序输出到一个文件内。在reduce任务中再读取相关的已排序的数据块。

某些shuffle操作会大量消耗堆内存,因为在记录传输前后,需要在在内存中使用数据结构来组织这些记录。需要特别说明的是,reduceByKey 和 aggregateByKey 在 map 时会创建这些数据结构,'ByKey 操作在 reduce 时创建这些数据结构。当内存不足时,Spark会把溢出的数据存到磁盘上,这将导致额外的磁盘 I/O 开销和增加垃圾回收的开销。

Shuffle还会在磁盘上生成大量的中间文件。从Spark 1.3开始,这些文件一直保存到相应的RDDs不再使用并被垃圾收集为止。这么做如果Spark重新计算RDD的血统关系(lineage),这些中间文件不需要再重新创建。如果应用程序保留了对这些RDDs的引用,或者GC不频繁,垃圾回收的周期会比较长。这意味着长期运行的Spark作业可能会消耗大量磁盘空间。临时存储目录可以通过spark.local.dir配置。

可以通过调整各种配置参数来调整洗牌行为,具体请参考Spark配置指南。

(个人补充:spark引入了类似于hadoop Map-Reduce的shuffle机制。该机制每一个ShuffleMapTask不会为后续的任务创建单独的文件,而是会将所有的Task结果写入同一个文件,并且对应生成一个索引文件。以前的数据是放在内存缓存中,等到数据完了再刷到磁盘,现在为了减少内存的使用,在内存不够用的时候,可以将输出溢写到磁盘,结束的时候,再将这些不同的文件联合内存的数据一起进行归并,从而减少内存的使用量。一方面文件数量显著减少,另一方面减少Writer缓存所占用的内存大小,而且同时避免GC的风险和频率。spark shuffle机制和原理分析)

RDD持久化

Spark中最重要的功能之一是跨操作在内存中持久化(或缓存)数据集。当持久化一个RDD时,每个节点将存储它计算的分区数据,并在该数据集(或从该数据集派生的数据集)的其他action中重用它们。这可以使之后的action变得更快(通常超过10倍),缓存是迭代算法和快速交互使用的关键工具。

可以使用persist()或cache()方法对RDD进行缓存。第一次计算之后,它会保存到工作节点(node)的内存中。Spark的缓存是容错的——如果RDD的任何一个分区丢失了,它将使用最初创建它的transformations自动重新计算。

此外,每个持久化的RDD可以使用不同的存储级别,例如,允许你缓存在磁盘上、以序列化的Java对象(以节省空间)缓存在内存中、跨节点复制它。通过向persist()方法传递StorageLevel对象设置存储级别。cache()方法是使用默认存储级别的简写,即:persist(StorageLevel.MEMORY_ONLY)。

全部存储级别看下表:

Storage LevelMEMORY
MEMORY_ONLY将RDD作为反序列化的Java对象存储在JVM中。如果RDD在内存中放不下,一些分区将不会被缓存,并且在每次需要时重新计算它们。这是默认级别。
MEMORY_AND_DISK将RDD作为反序列化的Java对象存储在JVM中。如果RDD在内存中放不下,则将多出的部分分区存入磁盘的,并在需要时从磁盘中读取它们。
MEMORY_ONLY_SER(Java and Scala)将RDD存储为序列化的Java对象(每个分区一个字节数组)。这通常比反序列化的对象更节省空间,特别是在使用快速序列化器时,但读取时需要更多cpu。
MEMORY_AND_DISK_SER(Java and Scala)与MEMORY_ONLY_SER类似,但是会将无法装入内存的分区存储到磁盘,而不是在每次需要时重新计算它们。
DISK_ONLY仅将RDD分区存储在磁盘上。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental)Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.

注意:在Python中,存储的对象总是会被Pickle库序列化,因此,是否选择序列化级别【serialized level】并不重要。Python中可用的存储级别包括MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY和DISK_ONLY_2。

Spark还会在shuffle操作中自动持久化一些中间数据(e.g. reduceByKey),即使用户没有调用persist()。这样做是为了避免在shuffle过程中,如果一个节点失败,则重新计算整个输入。如果用户有计划会重用RDD,建议他们调用persist。

选择哪个存储级别?

Spark的存储级别目的是在内存使用和CPU效率之间提供不同的权衡。我们建议通过以下过程来选择:
如果您的RDDs非常适合默认的存储级别(MEMORY_ONLY),那么就保持这种方式。这是cpu效率最高的选项,允许RDDs上的操作尽可能快地运行。
如果不是,请尝试使用MEMORY_ONLY_SER并选择一个快速序列化库,以使对象更节省空间,但访问速度仍然相当快。(Java和Scala)
不要把数据溢出到磁盘除非函数计算的代价很高或者过虑了大量的数据。不然重新计算分区的速度可能和读取磁盘一样快。
如果你想要快速的从故障中恢复(例如使用spark服务来自web的请求),请使有副本【replicated 】的存储级别,所有的存储级别都可以通过重新计算丢失的数据来达到完全容错,但是有副本的存储级别可以让你继续执行任务而不用等待重新计算丢失的分区数据。

移除数据

Spark自动监视每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式删除旧数据分区。如果您想手动删除一个RDD,而不是等待它自动从缓存中删除,使用RDD.unpersist()方法。这个方法在默认情况下不会阻塞。若要阻塞直到释放资源,请在调用此方法时指定blocking=true。

共享变量

通常,在远程集群节点上执行传递给Spark操作(如map或reduce)的函数时,函数使用所有变量都是各自的副本。这些变量被复制到每一台机器上,对远程机器上的变量的更新不会传播回驱动程序上。在任务之间支持通用的读写共享变量是低效的。但是spark为两种常用的使用模式提供了两种共享变量:广播变量和累加器。

广播变量

广播变量允许程序员在每台机器上缓存一个只读变量,而不是每一个任务发送一个变量的副本。例如,可以使用广播变量以高效的方式为每个节点提供一个超大的输入数据集的副本。Spark尝试使用有效的广播算法来分配广播变量,以降低通信成本。

Spark动作【actions】通过一组阶段【stages】执行,这些阶段是通过“shuffle”操作划分的。Spark自动广播每个阶段中任务所需的公共数据。以这种方式广播的数据以序列化的形式缓存,并在运行每个任务之前进行反序列化。这意味着,只有当跨多个阶段的任务需要相同的数据,或者以反序列化的形式缓存数据很重要时,显式创建广播变量才有用。

创建变量v的广播变量通过调用SparkContext.broadcast(v)实现。广播变量是变量v的包装器,它的值可以通过调用value方法来访问。

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]

在创建broadcast变量之后,应该在集群上运行的任何函数中使用broadcast变量而不是值v,这样v就不会被多次发送到节点。此外,在v被广播后不应该再修改它了,以确保所有节点获得广播变量的相同值(例如,如果变量稍后被发送到新节点)。

释放复制到执行器上的广播变量,调用.unpersist()。如果之后该广播变量被再次使用,它将被重新广播。永久释放广播变量使用的所有资源,调用.destroy()。这之后就不能再使用广播变量了。这些方法在默认情况下不会阻塞。若要阻塞直到释放资源,请在调用方法时指定blocking=true。

累加器

累加器是只能被累加的变量,可以有效地并行执行,用来实现计数器(如MapReduce)或求和。Spark原生支持数值类型的累加器,程序员可以添加对新类型的支持。

作为用户,你可以创建命名或未命名的累加器。如下图所示,一个命名的累加器(counter)会在web UI中Stages页签显示,Spark在“Tasks”表中显示由任务修改的每个累加器的值。

在UI中跟踪累加器有助于理解运行的阶段【stage】的进度(注意:这在Python中还不被支持)。

通过调用SparkContext.accumulator(v)从初始值v创建一个累加器。运行在集群中的任务可以使用add方法或者+=操作符来累加它,但是不能读到它的值,只有燕鲍翅程序可以使用value方法读取累加器的值。

>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

>>> accum.value
10

上面示例代码使用了内置的int类型的累加器,程序员可以创建AccumulatorParam类的子类实现自己的类型。AccumulatorParam接口有两个方法:zero,addInPlace。zero方法为你的数据类型提供初始值,addInPlace方法使两个值相加。例如假设我们有一个Vector类代表数学上的向量:

class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return Vector.zeros(initialValue.size)

    def addInPlace(self, v1, v2):
        v1 += v2
        return v1

# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

对于只在动作内部执行的累加器更新,Spark保证每个任务对累加器的更新只应用一次,也就是说,重新启动的任务不会更新这个值。在转换【transformations】中,如果任务【task】或阶段【job stage】重新执行时每个任务的更新就可能会不只一次。

累加器不会改变spark的惰性计算模型。如果它们在RDD的操作中被更新,那么它们的值只在进行action时才被更新。因此,当在像map()这样的延迟转换中进行累加器更新时,不能保证会执行。下面的代码片段演示了这个特性:

accum = sc.accumulator(0)
def g(x):
    accum.add(x)
    return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.

参考资料:
https://spark.apache.org/

以上是关于《SPARK官方教程系列》(标贝科技)的主要内容,如果未能解决你的问题,请参考以下文章

Spark惰性机制引起的:Input path does not exist

张海腾:语音识别实践教程

spark系列之基本概念

Spark惰性转换执行障碍

Spark入门系列视频教程

struts2官方 中文教程 系列九:Debugging Struts