多线程提交Spark Job

Posted DataRain

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程提交Spark Job相关的知识,希望对你有一定的参考价值。

当一个spark application在主线程提交spark job的时候,是会阻塞主线程直到spark job被执行完成,然后才能执行后续的代码。这其实也是正常的,一个RDD必须要被初始化成功后才能被后续任务所使用,否则肯定是会出现异常的。但是如果我们没有后续对RDD的操作呢?如果这个RDD执行占用资源不多,而我想并行执行多个spark job呢?其实有一个办法,就是多开几个线程,在子线程上提交spark job,这样就可以并行执行多个spark job,当然这是有条件限制的。


首先我们在提交spark任务的时候会设置spark executor数目、cores数量等,例如我这配置了10个executor并且每个executor有2个core,那么我的计算资源就有20个核,也就是说可以并行执行20个Task,或者说同时处理20个block的数据。这时候如果我们提交了一个只有10个block的数据的话,任务运行只会占用10个Task,而另外10个Task 其实会空闲下来,但是这个spark job仍然会继续阻塞后续的spark job直到执行完毕。因此这里会造成一些资源上的浪费:明明还有可以使用的计算资源,但是对于一个线程中提交的spark job是按照顺序执行的,无法充分利用集群资源。

按顺序执行,有资源没被完全利用


这里在使用子线程提交任务的话,就可以使用到空闲的Task资源了,只要Task还有提交是spark job就可以利用这些Task向下执行,达到并行执行多个spark job的目的。如果没有空闲的Task,Task全被一个spark job占据完了,那么其他线程所提交的spark job也会在后续等待前面的spark job执行完毕。我们可以利用线程池提交任务,子线程继承Callable<T>可以向父线程返回一个Future对象,这个Future对象可以用于获取子线程返回的值,方便我们得到spark job的运行结果;也可以在使用future.get()方法的时候捕获到子线程的异常,检测spark job的真实运行情况;也可以通过这个控制对象来取现线程执行,来取消一些误提交的spark job。

Future<T > future = ExecutorService.submit(new RunTaskThread());

多线程提交任务,利用空闲资源


通过这种方式提交spark job经过试验是可行的,但是说能不能更大程度上提高了资源的利用效率,这取决于你申请到的Spark执行资源。申请的资源少,一个spark job就能占满。但是如果本身申请到的资源多,有同时提交很多小任务的需求的话,这个还是能节省很多时间的,并行执行的spark job还是能快很多。

申请资源本来不多的情况,也无法执行其他Job

以上是关于多线程提交Spark Job的主要内容,如果未能解决你的问题,请参考以下文章

Spark 资源池简介

总结Spark优化-多Job并发执行

kafka系列 -- 多线程消费者实现

Spark的Task调度原理

如何避免在多线程处理中重写类?

Spark源代码::Spark多线程::NettyRpcEnv.ask解读