线程池与ForkJoin框架

Posted Java与大数据学习

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池与ForkJoin框架相关的知识,希望对你有一定的参考价值。

线程池

线程池的作用:

线程池作用就是限制系统中执行线程的数量。

根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列的中取最前面的任务开始执行。若队列中没有等待进程,线程池的这一资源处于等待。当一个新任务需要运行时,如果线程池中有等待的工作线程,就可以开始运行了;否则进入等待队列。

为什么要用线程池:

1.减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务。

2.可以根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下(每个线程需要大约1MB内存,线程开的越多,消耗的内存也就越大,最后死机)。
Java里面线程池的顶级接口是Executor,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。真正的线程池接口是ExecutorService。

要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。

1.newSingleThreadExecutor

创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

实例:

public class newSingleThreadExecutorTest {
        public static void main(String[] args) {
            ExecutorService ex = Executors.newSingleThreadExecutor();
            for(int i=0;i<10;i++){
                ex.execute(new Runnable() {
                    @Override
                    public void run() {                                                                             System.out.println(Thread.currentThread().getName());                        
                    }
                });
            }
        }
    }

输出为:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

由输出结果可以看出始终只有一个线程在工作。

2.newFixedThreadPool

创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

实例:

public class newFixedThreadPoolTest {
        public static void main(String[] args) {
            ExecutorService ex = Executors.newFixedThreadPool(5);
            for(int i=0;i<10;i++){
                ex.execute(new Runnable() {
                    @Override
                    public void run() {
                           System.out.println(Thread.currentThread().getName());
                    }
                });
            }
        }
    }

输出为:

pool-1-thread-1
pool-1-thread-2
pool-1-thread-2
pool-1-thread-5
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-3
pool-1-thread-4

我们启动了10个线程,但是池中只有5个线程工作,所以结果中最多只有5个线程。

3.newCachedThreadPool

创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,
那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

我们来看一个小例子:

public class newCachedThreadPoolTest {
        public static void main(String[] args) {
            ExecutorService ex = Executors.newCachedThreadPool();
            for(int i=0;i<10;i++){
                ex.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName());
                    }
                });
                try {
                    Thread.sleep(6000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

输出为:

pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1

线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。但是如果执行第二个任务时第一个任务没有完成则又是另一番景象,我们把上面的例子稍稍改一下就有所不同:

public class newCachedThreadPoolTest {
        public static void main(String[] args) {
            ExecutorService ex = Executors.newCachedThreadPool();
            for(int i=0;i<10;i++){
                ex.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName());
                        try {
                            Thread.sleep(6000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
    }

输出为:

pool-1-thread-1
pool-1-thread-3
pool-1-thread-2
pool-1-thread-5
pool-1-thread-4
pool-1-thread-6
pool-1-thread-7
pool-1-thread-8
pool-1-thread-9
pool-1-thread-10

第一个任务在执行的时候等待了6秒,所以此时第二个任务执行的时候则是新建一个线程来执行。

4.newScheduledThreadPool

创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

该方法直接实现了ScheduledExecutorService接口,而该接口相当于提供了”延时”和”周期执行”功能的ExecutorService,再来看一下该方法的源码:

public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory)
 
{
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }

返回值是ScheduledExecutorService类型的,与其他3个方法不同,需要注意。我们来看一个小例子:

public class newScheduledThreadPoolTest {
        public static void main(String[] args) {
            ScheduledExecutorService ex = Executors.newScheduledThreadPool(5);
            for(int i=0;i<10;i++){
                ex.schedule(new Runnable() { //定时执行的线程池
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName());
                        try {
                            Thread.sleep(6000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                },2, TimeUnit.SECONDS); 
            }
        }
    }

输出结果为:

pool-1-thread-2
pool-1-thread-4
pool-1-thread-1
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
pool-1-thread-3
pool-1-thread-5
pool-1-thread-1
pool-1-thread-4

启动后会延迟2s之后才开始执行。

我们再来看一个周期性执行的例子:

public class newScheduledThreadPoolTest {
        public static void main(String[] args) {
            ScheduledExecutorService ex = Executors.newScheduledThreadPool(5);
            for(int i=0;i<10;i++){
                ex.scheduleAtFixedRate(new Runnable() { //延迟3s后每2s周期性执行一次,不停
                    @Override
                    public void run() {
                        System.out.println(Thread.currentThread().getName());
                        try {
                            Thread.sleep(6000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                },3,2, TimeUnit.SECONDS);
            }
        }
    }

输出为:

pool-1-thread-3
pool-1-thread-4
pool-1-thread-2
pool-1-thread-5
pool-1-thread-1
...

newScheduledThreadPool中有很多另外3个类中没有的方法,我们来看一下:

  1. shedule(Runnable command, long delay, TimeUnit unit): 延迟一定时间后执行Runnable任务;

  2. schedule(Callable callable, long delay, TimeUnit unit): 延迟一定时间后执行Callable任务;

  3. scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit):  延迟一定时间后,以间隔period时间的频率周期性地执行任务;

  4. scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit):  与scheduleAtFixedRate()方法很类似,但是不同的是scheduleWithFixedDelay()方法的周期时间间隔是以上一个任务执行结束到下一个任务开始执行的间隔,而scheduleAtFixedRate()方法的周期时间间隔是以上一个任务开始执行到下一个任务开始执行的间隔,也就是这一些任务系列的触发时间都是可预知的。

Fork/Join框架

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

我们通过表面的意思去理解ForkJoin框架:Fork即把一个大任务切割成若干部分并行执行,join即把这些被切分的任务的执行结果合并一起汇总,也就是分治归并算法。

Fork / Join的逻辑很简单:

1)将每个大任务分离(fork)为较小的任务; 
2)在单独的线程中处理每个任务(如果必要,将它们分离成更小的任务); 
3)加入结果。

Fork/Join框架的核心是由下列两个类组成的。

①ForkJoinPool:这个类实现了ExecutorService接口和工作窃取算法(Work-Stealing Algorithm)。它管理工作者线程,并提供任务的状态信息,以及任务的执行信息。

②ForkJoinTask:这个类是一个将在ForkJoinPool中执行的任务的基类。

理解一个概念的最好方法是在实践中体会他,我们先写一个小程序,在此基础上一点一点来分析:

public class ForkJoinPoolTest {
    public static void main(String[] args) throws InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        pool.submit(new PrintTask(1,100));
        pool.awaitTermination(2, TimeUnit.SECONDS);//阻塞当前线程直到 ForkJoinPool 中所有的任务都执行结束
        pool.shutdown();
    }
}

class PrintTask extends RecursiveAction{
    private int start;
    private int end;
    private int num;
    final int MAX = 50;

    public PrintTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected void compute() {
        if(end - start < 50){
            for(int i = start;i <= end; i++){
                num += i;
            }
            System.out.println("当前任务结果为: "+num);
        }else{
            int mid = (end + start)/2;
            PrintTask left = new PrintTask(start,mid);
            PrintTask right = new PrintTask(mid+1,end);
            left.fork();
            right.fork();
        }
    }
}

结果为:

当前任务结果为: 3775
当前任务结果为: 1275

Process finished with exit code

我们通过结果可以看到当前任务被分裂为两个子任务去执行。而执行任务的类继承了RecursiveAction这个类,那他到底在Fork-Join框架中发挥什么作用呢?我们不妨看一下:

首先我们来看一下Fork-Join框架提交任务的方法仍旧还是submit和execute:

void execute(ForkJoinTask<?> task) //安排(异步)执行给定任务

void execute(Runnable task) //在未来的某个时候执行给定的命令

<T> ForkJoinTask<T> submit(Callable<T> task) //执行一个有返回值得任务,返回一个Future类型的实例代表任务的结果

<T> ForkJoinTask<T> submit(ForkJoinTask<T> task) //提交一个ForkJoinTask类型的任务

ForkJoinTask<?> submit(Runnable task) //提交一个Runnable类型的任务,返回一个Future类型的实例代表任务结果 

<T> ForkJoinTask<T> submit(Runnable task, T result) //提交一个Runnable类型的任务,返回一个Future类型的实例代表任务结果 

由execute和submit的参数我们可以看到Fork-join框架可以提交ForkJoinTask,Callable和Runnable类型的任务。这个ForkJoinTask我们之前没见过,先来看一下:

public abstract class ForkJoinTask<Vimplements Future<V>, Serializable {
}

我们看到ForkJoinTask实现了Future接口,一个ForkJoinTask是一个轻量级的Future。对ForkJoinTask效率源于一组限制(这只是部分静态强制执行)反映其用途作为计算任务计算纯函数或纯粹孤立的对象操作。主要的协调机制fork(),安排异步执行,而不进行join(),直到任务的结果已经计算。通常我们并不直接继承 ForkJoinTask,它包含了太多的抽象方法。针对特定的问题,我们可以选择 ForkJoinTask 的不同子类来完成任务:

RecursiveAction:用于任务没有返回结果的场景。

RecursiveTask:用于任务有返回结果的场景。

上面的例子中我们就是继承了RecursiveAction子类用于没有返回结果的场景,下面我们再看一下RecursiveTask用于有返回结果的场景:

public class TestRecursiveTask {

    public static void main(String[] args) {
        Integer result = 0;
        ForkJoinPool pool = new ForkJoinPool();
        Future<Integer> future = pool.submit(new SumTask(30));
        try {
            result = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println(result+"===========================");
    }
}
class SumTask extends RecursiveTask<Integer{
    int num;

    public SumTask(int num) {
        this.num = num;
    }

    @Override
    protected Integer compute() {
        if(num <= 20){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("生产完成"+num+"个产品");
            return num;
        }else{
            SumTask task1 = new SumTask(20);
            SumTask task2 = new SumTask(num - 20);
            task1.fork();
            task2.fork();
            return task1.join() + task2.join();
        }
    }
}

结果为:

生产完成20个产品
生产完成10个产品
30===========================

Process finished with exit code 

我们看到继承RecursiveTask类指定了返回值类型为Integer,在compute方法中的返回值类型即为Integer类型。

从以上的例子中可以看到,通过使用 Fork/Join 模式,软件开发人员能够方便地利用多核平台的计算能力。尽管还没有做到对软件开发人员完全透明,Fork/Join 模式已经极大地简化了编写并发程序的琐碎工作。对于符合 Fork/Join 模式的应用,软件开发人员不再需要处理各种并行相关事务,例如同步、通信等,以难以调试而闻名的死锁和 data race 等错误也就不会出现,提升了思考问题的层次。你可以把 Fork/Join 模式看作并行版本的 Divide and Conquer 策略,仅仅关注如何划分任务和组合中间结果,将剩下的事情丢给 Fork/Join 框架。


以上是关于线程池与ForkJoin框架的主要内容,如果未能解决你的问题,请参考以下文章

Java并发包线程池之ForkJoinPool即ForkJoin框架

JUC高级多线程_09:ForkJoin框架的具体介绍与使用

JavaLearn# (13)多线程:线程生命周期线程控制线程同步线程通信线程池ForkJoin框架

Java核心技术读书笔记10-2 阻塞队列线程安全集合类Callable与Future线程池与任务组同步框架

Java多线程Java的MapReduce框架ForkJoin

ForkJoin框架