谈Spark下并行执行多个Job的问题

Posted Mr-Bruce

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了谈Spark下并行执行多个Job的问题相关的知识,希望对你有一定的参考价值。

  对于Spark这样的分布式计算系统,任务会分发到多台机器上执行。如何榨干有限的集群资源来实现快速并行计算,是需要考虑的重要问题之一。而这个问题又可以拆解为:如何将有限的集群资源都分配给Spark使用;如何将分配到的资源都利用起来。本文的话题属于后者的范畴,将从笔者在实践中遇到的场景出发,探讨如何在Spark下并行执行多个Job。

背景

  在我们的数据系统中,有一些实时流任务与离线任务会将处理结果以Parquet的形式存储到AWS S3上。为了便于后续处理与快速查询,需要按照数据类型(目前有四种类型)、时间间隔(按天分隔)来划分Parquet分区,即一批数据会分散存储到不同的路径下,比如s3://data/type=security/interval=1552608000、s3://data/type=policy/interval=1552608000等。目前,我们通过Spark DataFrame的API来写文件,有两种实现方式:

  • 通过for循环依次根据数据类型、时间间隔将数据Filter出来,然后写入相应的路径下。
for type in types:
    for interval in intervals:
        df.filter(df.type==type).filter(df.interval==interval).write.parquet("s3://data/type=%s/interval=%s" % (type, interval))
  • 通过partitionBy功能让Spark自动做将数据写入不同的分区路径。
df.write.partitionBy("type", "interval").mode("append").parquet("s3://data")

  这两种实现方式,前者是显式的一件一件做,每循环一次就是一个Job,后者是在一个Job中完成。看上去后者的执行效率会更高,代码也简洁,但实际效果并非如此。两个原因:首先,第二种方式在每个Task中还是根据相应的数据类型、时间间隔来串行写入的,并没有真正提升写入速度。其次,第一种方式会更加可控,可以显式的知道做到了哪里,如果出错的话可以知道错在哪个环节,恢复回来时可以根据checkpoint将已经做过的跳过去,避免重复写入。目前,我们主要采用第一种方式,这也是我们后面探讨的前提。
  对于一个Spark Job,我们总是期望能充分利用所有的cpu-vcore来并行执行,因此通常会将数据repartition成cpu-vcore的个数,即每个cpu-vcore上跑一个Task。而对于写文件的Job,每个Task会写入到自己的一个文件中,最终生成的文件数是由Task个数决定。在下图1中,假设集群总共有12个cpu-vcore分配给Executor使用,那么就会有12个Task并行执行写入,最终生成12个文件。

图1. 充分利用资源来写文件

  从充分利用资源的角度来看,这样的设计无疑是最佳的。但是,对于一些实时流处理任务或者周期性的离线任务而言,这样做会产生大量的小Parquet文件,会给后续的文件加载和快速查询带来困难。因此,从尽可能产生少量文件的角度出发,需要采用图2所示的写入方式,即在写入前,将数据分配到少量的Partition中,用少量的Task来执行。但是,这样做就会导致有部分cpu-vcore在写入过程中处于闲置状态,造成了资源浪费。

图2. 产生少量文件的方案

  显然,在这件事情上,“充分利用资源”和“产生少量文件”两个方向发生了冲突。那么,有没有一个两全之策呢?即既保证产生少量文件,又能把原本闲置的资源利用起来。如下图3所示,假设我们能同时跑多个写入文件的Job,每个Job利用一部分cpu-vcore来执行,似乎就可以达到这个目的了。带着这样的思路,笔者做了一番调研与实践。

图3. 多Job并行执行

可行性分析

  上述思路可以总结为:通过一个SparkContex并行提交多个Job,由Spark自己来调度资源,实现并行执行。针对这个思路,首先要搞清楚Spark是否支持这么玩,如果支持的话又是怎么支持的。本节将简单梳理下Spark的任务调度机制。
  图4是笔者在公司分享时用的一张图,主要希望表达三个意图:任务调度的流程、任务调度的对象、任务调度涉及哪些模块。综合起来,简要概括如下:

  • SparkContext向DAGScheduler提交一个Job后,会创建一个JobWaiter对象,用于阻塞当前线程,等待Job的执行结果。因此,在一个线程中,Job是顺序执行的。
  • DAGScheduler会根据RDD的依赖关系将一个Job划分为若干个Stage(以Shuffle为界)。因为前后Stage存在数据上的依赖,所以只有父Stage执行完毕才能提交当前Stage。
  • DAGScheduler在提交Stage时,会根据Partition信息生成相应的Task,打包成TaskSet,提交给TaskScheduler。而TaskScheduler收到后,会将TaskSet封装成TaskSetManager,丢到任务队列中等待执行。
  • SchedulerBackend负责Executor状态与资源的管理,当发现有空闲资源时,就会通过TaskScheduler从任务队列中取出相应的TaskSetManager去调度执行。
  • TaskSetManager中的Task最终会分发到Executor中的线程里去执行。
图4. Spark任务调度机制

  从上述分析中不难发现,Spark是以TaskSetManager为单元来调度任务的。通常情况下,任务队列中只会有一个TaskSetManager,而通过多线程提交多个Job时,则会有多个TaskSetManager被丢到任务队列中。在有空闲资源的情况下,谁会从队列里被取出来执行就取决于相应的调度策略了。目前,Spark支持FIFO和FAIR两种调度策略,在实践中分别有图5所示的三种模式。

  • 默认是FIFO策略,顾名思义,采用先进先出的方式。只有TaskSetManager A中没有Task需要执行并且有多余的资源时,才会调度TaskSetManager B,即优先做前面的任务。举例来说,假设有12个cpu-vcore,TaskSetManager A和B分别有18个、10个Task需要执行,那么只有当TaskSetManager A已经执行完了前面12个Task,在执行剩下的6个Task时,才会有多余的6个cpu-vcore分配给TaskSetManager B。另一方面,假设TaskSetManager A和B各有6个Task需要执行,则刚好可以并行执行了。
  • 通过参数spark.scheduler.mode可以设定策略为FAIR,该方式让多个TaskSetManager都有机会执行。但是,需要配合FAIR Pool来使用。默认情况下,TaskSetManager会被全部丢到一个default的Pool里,此时调度效果与是FIFO一样的。
  • Spark支持在线程中设定自己的FAIR Pool,从而将该线程中提交的TaskSetManager丢到指定的Pool中。多个FAIR Pool是会被轮询执行的,执行权重可以预先设定。通过这种方式,可以让所有TaskSetManager都有机会被调度,而不会被先进队列的需要长时间运行的其他任务阻塞住。但是,需要注意的是,只是让A、B、C都有机会执行了,整体执行的时间(A+B+C)并不会被缩短(相比FIFO而言)。
图5. Spark任务调度策略

实践探索

  通过上节的分析,基本可以明确以下两点:

  • Spark支持通过多线程在一个SparkContext上提交多个Job,每个线程里面的Job是顺序执行的,但是不同线程的Job是可以并行执行的,取决当时Executor中是否有充足的cpu-vcore。
  • 任务队列中的TaskSetManager是有序执行,还是轮询执行(可分配权重)取决于采用哪种调度策略,即上述的模型1或3(不推荐使用2)。不管是哪种执行方式,整体执行时间基本是一致的,只是模型3更适用于多用户的场景,让大家都有机会干活。

  在这样的思路下,我们做了一些实践探索,部分代码如下所示。该代码从固定路径下读取文件数据,然后启动5个线程分别提交5个写文件的Job。实验中,集群共有55个cpu-vcore分配给Executor,每个写文件Job使用11个cpu-vcore来写入,刚好可以分配5个Job并行执行。另外,该实验采用默认的FIFO调度策略。

var df = spark.read.parquet("s3://data/type=access/interval=1551484800").repartition(55)
// df.cache()
// val c = df.count()
// println(s"$c")

val jobExecutor = Executors.newFixedThreadPool(5)
for( _ <- Range(0, 5) ) 
  jobExecutor.execute(new Runnable 
    override def run(): Unit = 
      val id = UUID.randomUUID().toString()
      df.coalesce(11).write.parquet(s"s3://data/test/$id")
    
  )

  看上去上述的分配没有问题,但是第一次实验时就失败了。这里的"失败",指的是Spark没有按照我们预期的那样去并行执行写入。如下图6所示,有5个Active Stage,但是只有一个处于running状态,其有467个Task需要执行。造成这个问题的原因是,Spark是Lazy执行的,5个写入Job都需要先去读取原始文件数据,再执行自己的写入,而读取文件这个动作需要467个Task去执行,导致阻塞。

图6. 实验一

  知道了问题所在,就可以出招应对了。我们期望的是读取文件和相关处理只做一次,而写文件分别由不同的线程并行执行。因此,需要将前者预先执行掉,并将数据缓存在内存中,即使用上述注释掉的代码。之后,就可以达到预期的并行执行的效果了,如下图7所示。但是,我们又发现另一个问题:不断有Executor因为内存不足被替换。原因是,相比原先一个一个写文件的情形,5个Job并行做事时,对内存的消耗也接近5倍了,需要分配更多的内存给Executor。

图7. 实验二

  综合来看,要达到并行执行的效果,并不是通过多线程提交多个Job就好了,需要通盘考虑多个事情。

  • 并行执行时,资源怎么分配给不同的Job
  • 哪些动作是需要统一做的,哪些是需要并行执行的,在哪里做Cache来应对Lazy执行带来的问题
  • 合理提升Executor的内存来保证并行执行过程中内存是够用的
  • 其他未知事项

  下图8所示,是实验结果的一个对比。可以看到,执行1个写入Job需要14秒,那么顺序执行5个就需要70秒了;而并行执行5个Job时,最长的一个需要30秒,整体节省了57%的时间。另一方面,为何并行执行时最长的需要花费30秒,相比14秒,增长了1倍多?目前分析来看,跟数据传输和GC时间有关,会进一步研究。

图8. 整体并行执行时间对比

总结

  目前,多Job并行执行的方案已在笔者项目中使用,也达到了预期的效果。期望本文的分享能对有同样需求的读者有所帮助。



(全文完,本文地址:https://bruce.blog.csdn.net/article/details/88349295
(版权声明:本人拒绝不规范转载,所有转载需征得本人同意,并且不得更改文字与图片内容。大家相互尊重,谢谢!)

Bruce
2019/03/20 下午

以上是关于谈Spark下并行执行多个Job的问题的主要内容,如果未能解决你的问题,请参考以下文章

Spark的Task调度原理

Spark的Job的划分

Spark的Job的划分

多线程提交Spark Job

spark中job,stage,task的关系

PowerShell 并行执行任务