spark编程指南

Posted willwillie

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark编程指南相关的知识,希望对你有一定的参考价值。

本文翻译文章
http://spark.apache.org/docs/latest/programming-guide.html
可以通过很多支持的API去操作RDD 也还需要浪费很多笔墨来组织这一块的操作,因为这一块实在是非常的重要。

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).
transformations,也就是说从一个已经存在的rdd创建一个新的rdd,比如map就是一个转换,将一个函数apply到rdd中的每个元素,结果返回一个新的rdd;
actions,对rdd执行一个计算并将计算的结果返回给驱动程序,比如reduce是一个动作,使用一个函数将rdd所有的元素集成为一个,并且将最终的得到的结果返回给驱动程序。(即使,也有像并行的reduce,比如像reduceByKey,会返回一个分布式的数据集)。
All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.
spark中所有的transformations 都是懒惰的,因为每当执行这样的转换的时候并不是马上执行,而是只记住这个转换的动作(以及target,比如一个file)。只有当一个动作发生,要求将结果返回给驱动程序的时候,这些转换也才真正的被执行。这个设计使得spark跑得效率会更加高一些,比如,一个通过map创建的大的数据集dataset将会被reduce使用并且最终reduce的结果给驱动程序,而不是这个大的mapped的数据集。
By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.
通常来说,每一个转换过的rdd当你再次在上面执行一个动作的时候,这些转换会再次执行一遍。不过你可以通过将这个RDD persist保持到内存中,通过perist或者一个cache函数,这样的话,spark就会将这些元素保存到集群中,下次你访问的时候就会快很多。当然你也可以将rdd persist到磁盘上。或者复制到多个节点的上面。

scala> val lines = sc.textFile("file:///usr/local/spark/README.md")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/README.md MapPartitionsRDD[7] at textFile at <console>:24

scala> val lineLengths = lines.map(s => s.length)
lineLengths: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at map at <console>:26

scala> val totalLength = lineLengths.reduce( (a,b) => a+ b )
totalLength: Int = 3729

从这里可以看出,spark可以通过textFile来创建,当然也可以通过persist(数据)来创建。

这里的第一行,只是定义了一个基本的RDD,lines并没有加载到内存中,只是指向了一个外部文件。
第二行,定义了lineLengths 这个RDD,通过map将lines里面的每一行通过length函数求出长度,作为第二个rdd,当然这个转换也还没有真的执行;
第三行,定义了totalLength,通过reduce动作,要求出总的长度,在这里,spark将这个操作分成任务交给不同的机器去做,每一个机器会跑属于它的部分的map和本地的reduce操作,最终只将结果返回给驱动程序。
如果后续我么想要使用 lineLengths这个rdd,我们可以将他persist到内存中,通过

lineLengths.persist()
res0: lineLengths.type = MapPartitionsRDD[8] at map at <console>:26

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. In Java, functions are represented by classes implementing the interfaces in the org.apache.spark.api.java.function package. There are two ways to create such functions:
spark的API非常依赖传递给驱动程序的函数,在java中有两种方法创建这种函数

我们可以将代码写成下面这样:

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() 
  public Integer call(String s)  return s.length(); 
);
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() 
  public Integer call(Integer a, Integer b)  return a + b; 
);

或者:

class GetLength implements Function<String, Integer> 
  public Integer call(String s)  return s.length(); 

class Sum implements Function2<Integer, Integer, Integer> 
  public Integer call(Integer a, Integer b)  return a + b; 


JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

当然在clojure中应该有不一样的写法吧?(不过目前我也还没有这方面的实践。。)

怎么理解closures?(理解clojure也不一定能理解scala的闭包吧)

One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we’ll look at code that uses foreach() to increment a counter, but similar issues can occur for other operations as well.
spark的一个难点就是去了解spark的scope和变量方法的生命周期(当在集群中执行代码的时候)

foreach
foreach用于遍历RDD,将函数f应用于每一个元素。
那么foreach的用法是怎样的呢?

makeRDD和parrallell

parallelize
def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]
从一个Seq集合创建RDD。
参数1:Seq集合,必须。
参数2:分区数,默认为该Application分配到的资源的CPU核数
accumultor
使用accumulator共享变量与foreach结合,倒是个不错的选择。

scala> var rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> rdd.collect
res4: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> rdd.partitions.size
res5: Int = 48

scala>  var rdd2 = sc.parallelize(1 to 10,3)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24

scala> rdd.partitions.size
res6: Int = 48

scala> rdd2.partitions.size
res7: Int = 3

那么makeRDD的用法是怎样的呢?和parallelize有什么区别呢?

def makeRDD[T](seq: Seq[T], numSlices: Int = defaultParallelism)(implicit arg0: ClassTag[T]): RDD[T]

这种用法和parallelize完全相同

def makeRDD[T](seq: Seq[(T, Seq[String])])(implicit arg0: ClassTag[T]): RDD[T]

该用法可以指定每一个分区的preferredLocations。

 var collect = Seq((1 to 10,Seq("slave007.lxw1234.com","slave002.lxw1234.com")),
(11 to 15,Seq("slave013.lxw1234.com","slave015.lxw1234.com")))
collect: Seq[(scala.collection.immutable.Range.Inclusive, Seq[String])] = List((Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10),
List(slave007.lxw1234.com, slave002.lxw1234.com)), (Range(11, 12, 13, 14, 15),List(slave013.lxw1234.com, slave015.lxw1234.com)))

scala> var rdd = sc.makeRDD(collect)
rdd: org.apache.spark.rdd.RDD[scala.collection.immutable.Range.Inclusive] = ParallelCollectionRDD[6] at makeRDD at :23

scala> rdd.partitions.size
res33: Int = 2

scala> rdd.preferredLocations(rdd.partitions(0))
res34: Seq[String] = List(slave007.lxw1234.com, slave002.lxw1234.com)

scala> rdd.preferredLocations(rdd.partitions(1))
res35: Seq[String] = List(slave013.lxw1234.com, slave015.lxw1234.com)
scala> var cnt = sc.accumulator(0)
cnt: org.apache.spark.Accumulator[Int] = 0

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21

scala> rdd1.foreach(x => cnt += x)

scala> cnt.value
res51: Int = 55

collect

def collect(): Array[T]
collect用于将一个RDD转换成数组。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21

scala> rdd1.collect
res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

take

def take(num: Int): Array[T]
take用于获取RDD中从0到num-1下标的元素,不排序。

count()
RDD里元素个数
countByValue()各元素在RDD中出现次数

Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println)

也就是说,打印RDD元素的方式可以有rdd.foreach(println) / rdd.map(println),这如果是在一个单机上,会获得想要的输出并且打印所有RDD的元素。不过如果在一个集群上面,executors执行的输出只会打印到executors的stdout,不会到driver的stdout。所以如果要在driver上面显示所有的输出,driver可以先执行collect方法将RDD搬到driver节点:rdd.collect().foreach(println).不过更加节省内存的做法是使用take,rdd.take(100).foreach(println)

(这一块已经亲测,确实是这样的)

关于二元组:
一些通用的“shuffle”操作,比如按照key来grouping或者aggregating RDD的元素。

scala写法

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

java写法:
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

其实我对这里的代码有挺多的疑问的,为什么会是这样的写法呢?虽然对java有点了解,但是也没有见过这样的写法。(可能是由这个api定义的写法)

spark的理念本身就和clojure的相差的更近,所以用这两种语言写spark 程序的时候都会觉得非常的不舒服,可以直接尝试用clojure来写了。http://gorillalabs.github.io/sparkling/articles/getting_started.html

map(func) Return a new distributed dataset formed by passing each element of the source through a function func.

然后类似的API还有foreachPartition
而在内部实现上,其实是大同小异的。对于foreachPartition而言,直接在各个partition上运行传入的函数文本;而对于foreach而言,是把传入的函数文本,交给各个partition的foreach去执行。

shuffle
spark的某些操作会触发一个叫做shuffle的事件,Spark机制的shuffle操作会重新将数据分布在不同的partitions上面。这通常包括将数据复制到executors上面和machines上面,使得shuffle是一个负责又昂贵的过程,
我们可以考虑reduceByKey这个例子,这个操作会生成一个新的RDD(某个key的所有值会合并到一个元祖中去,)。。。

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O. To organize data for the shuffle, Spark generates sets of tasks - map tasks to organize the data, and a set of reduce tasks to aggregate it. This nomenclature comes from MapReduce and does not directly relate to Spark’s map and reduce operations.
不过shuffle的性能当然也是可以通过参数调整的。(具体怎么调整后续在好好看看)

RDD的persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset in memory across operations. When you persist an RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset (or datasets derived from it). This allows future actions to be much faster (often by more than 10x). Caching is a key tool for iterative algorithms and fast interactive use.
spark的一个很重要的一个能力就是persisting(或者caching)数据到内存中,当你persist一个RDD的时候,每一个节点都会存储相应的分区到内存中去,并且下次其他动作使用的时候会再用到的(大概速度会快10倍),在迭代算法和快速交互式使用中,caching是一个非常重要的工具。

You can mark an RDD to be persisted using the persist() or cache() methods on it. The first time it is computed in an action, it will be kept in memory on the nodes.也就是说,你可以使用persist方法或者cache()来缓存RDD,当你第一次执行动作的时候,RDD会被persist到当前节点的nodes中间去。

Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it.这里的意思是说,spark的cache操作是容错的,如果一个RDD的任意分区丢失掉,他都会自动使用原来的转换操作重新计算它的分区。
In addition, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. These levels are set by passing a StorageLevel object (Scala, Java, Python) to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is:也就是说,RDD有不同的缓存级别,你可以选择将RDDpersist到磁盘上,或者内存上面(作为序列化的java对象来节省存储空间),或者将它复制到所有的节点;通过传递一个存储级别给persist函数就可以做到自定义的存储了。cache的默认存储级别是MEMORY_ONLY
下面来看一下有哪些存储级别:

存储级别含义
MEMORY_ONLY将RDD作为反序列化的java对象存储到JVM中,并且不够的partions不会cache,没存储到内存的部分会on the fly
MEMORY_AND_DISK将RDD作为反序列化的java对象存储到JVM中,并且不够的partions就存储到磁盘
MEMORY_ONLY_SER将RDD作为序列化的java对象存储到JVM中(可以节省空间,但是更耗CPU),并且不够的partions不会cache,没存储到内存的部分会on the fly
DISK_ONLY只存储到磁盘
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.和不带2的含义一样,只是会复制每一个partition到两个集群的节点
OFF_HEAP (experimental)存储到off-heap内存中(这个就和使用alluxio差不多了)

这里会有一个问题,那就是什么是序列化的java对象?

序列化(Serialization)是将对象的状态信息转换为可以存储或传输的形式的过程。一般将一个对象存储至一个储存媒介,例如档案或是记亿体缓冲等。在网络传输过程中,可以是字节或是XML等格式。而字节的或XML编码格式可以还原完全相等的对象。这个相反的过程又称为反序列化。

那么选择哪种存储级别去存储呢?
spark的存储级别就意味着在内存使用和CPU效率之间选择不同的权重,我们推荐按照如下方法去选择一个存储级别:
1.如果你的RDDs最适合使用默认的存储级别,那就选择默认的存储级别,这是最有利于CPU的方式,会使得RDD的动作执行得尽量的快。
2.如果不是,那么尽量使用MEMORY_ONLY_SER
3.不要将RDD存储到磁盘,除非这是非常大量数据的RDD,或者执行这些数据集的函数非常昂贵?
4.如果你想使用复制存储级别来使得快速的错误恢复(比如使用spark来处理web app的请求的时候),这会比那种重新计算要快一些。

Spark automatically monitors cache usage on each node and drops out old data partitions in a least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for it to fall out of the cache, use the RDD.unpersist() method.

通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传。在任务之间使用通用的,支持读写的共享变量是低效的。尽管如此,Spark提供了两种有限类型的共享变量,广播变量和累加器。

除了共享变量Accumulators,还有Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable
cached on each machine rather than shipping a copy of it with tasks.
They can be used, for example, to give every node a copy of a large
input dataset in an efficient manner. Spark also attempts to
distribute broadcast variables using efficient broadcast algorithms to
reduce communication cost.

————更新中——

以上是关于spark编程指南的主要内容,如果未能解决你的问题,请参考以下文章

每个 Promise-chain -> try/catch/await 指南都在教导危险的约定吗? [关闭]

oocalc 插件 - 提供分步指南吗?

Spark IMF传奇行动第22课:RDD的依赖关系彻底解密

南都周刊 别了,老兵乔布斯

Spark任务流程笔记

开源 Javascript PDF 查看器 [关闭]