ForkJoin框架详解

Posted truestoriesavici01

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ForkJoin框架详解相关的知识,希望对你有一定的参考价值。

ForkJoin框架详解

什么是Fork/Join框架

  • 提供一个可以用于并行执行任务的框架.
  • 是把一个大任务分割成若干个小任务,最终汇总小任务结果后得到大任务结果的框架.
  • Fork:将大任务切分成若干个子任务并行的执行. Join:合并这些子任务的执行结果,最后得到大任务的结果.

工作窃取算法(work-stealing)

  • 将一个大的任务分割成若干个小任务,放在不同的任务队列中.
  • 每个任务队列都有一个线程对其进行处理.
  • 若一个线程处理完其任务队列,可以窃取其他线程的队列中的未完成的任务.
  • 为了防止线程之间的竞争,任务队列采用双端队列.原线程从队列的一端获取任务.窃取任务的线程从队列的另外一段获取任务.

特点:

  • 充分利用线程并行计算,减少线程间的竞争.
  • 建立多个线程和多个队列消耗资源.

Fork/Join框架

步骤:

  1. 分割任务.通过fork将大任务分割成子任务.
  2. 执行任务并合并结果.分割的子任务放在双端队列中,启动多个线程获取任务并执行.子任务的结果同一放在一个队列中,启动一个线程获取结果后合并获取最终结果.

Fork/Join框架提供的类:

  • ForkJoinTask:提供在任务中执行fork()join()操作的机制.可以继承其子类:
    • RecursiveAction:没有返回结果的任务.
    • RecursiveTask:有返回结果的任务.
  • ForkJoinPool:提交的任务由其来执行.分割出来的子任务进入当前线程维护的双端队列的头部.工作线程没有任务时会从其他线程的队列的尾部获取任务.

使用Fork/Join框架

class MyForkJoin extends RecursiveTask{

    private int[] array;
    private int start;
    private int end;

    MyForkJoin(int[] array, int start, int end){
        this.array = array;
        this.start = start;
        this.end = end;
        System.out.println(Thread.currentThread().getName() + " Time is: " + System.currentTimeMillis());
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        if (end-start<=1){
            for (int i = start; i <= end; i++) {
                sum += array[i];
            }
        }
        else {
            MyForkJoin task1 = new MyForkJoin(array, start, (start + end) / 2);
            MyForkJoin task2 = new MyForkJoin(array, (start + end) / 2 + 1, end);
            task1.fork(); // 执行任务
            task2.fork();

            int res1 = (int) task1.join(); // 等待任务完成获取结果
            int res2 = (int) task2.join();
            sum = res1+res2;
        }
        return sum;
    }
}

异常处理

ForkJoinTask框架可能抛出异常,无法在主线程中直接获取异常.
可以通过isCompletedAbnormally()方法检查任务是否已经抛出异常或者已经被取消.getException()返回Throwable对象.

  • 任务取消 --> CancellationExcetpion.
  • 任务未完成或没有抛出异常 --> null.

实现原理

  • ForkJoinPool是由ForkJoinTask数组和ForkJoinWorkerThread数组组成.
  • ForkJoinTask数组用来存放提交给ForkJoinPool的任务.
  • ForkJoinWorkerThread数组负责执行这些任务.

核心方法

fork()

  • 异步执行这个任务并立即返回结果.
  • 会把当前任务存放在ForkJoinTask数组中,再唤醒或创建一个工作线程来执行任务.

join()

  • 阻塞当前线程并等待获得结果.
  • 首先由当前任务的状态判断返回的结果类型:
    • 已完成(NORMAL) -- > 直接返回结果
    • 被取消(CANCELLED) -- > 抛出CancellationException
    • 信号(SIGNAL)
    • 出现异常(EXCEPTIONAL) -- > 抛出对应的异常

以上是关于ForkJoin框架详解的主要内容,如果未能解决你的问题,请参考以下文章

fork/join框架中方法join和方法get的区别

Java并发包线程池之ForkJoinPool即ForkJoin框架

ForkJoin框架

JUC系列ForkJoin框架设计官方说明翻译

并发编程系列之什么是ForkJoin框架?

并发编程系列之什么是ForkJoin框架?