Fork/Join框架

Posted -qilin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Fork/Join框架相关的知识,希望对你有一定的参考价值。

Fork/Join框架:将一个任务拆分(fork)成若干个子任务,再将一个个的子任务运算的结果进行汇总(join)。

技术图片

  以四核CPU为例,多线程下将任务分配到每个cpu线程核上,传统线程存在的问题是:每个任务可能会阻塞,一单某个线程发生了阻塞,那么这个核上线程的其他线程将不能执行,与此同时,其他核上的线程执行完成后会达到空闲状态,

这样阻塞的线程队列还在等待,这样一定程度上会造成资源浪费,没有很好的利用CPU资源,被白白浪费掉,也会影响到效率问题。

  Fork/Join框架是将一个任务分成多个子任务,再将子任务压入到线程中,形成一个个线程队列,与传统线程池不一样的地方是,Fork/Join采用的是“工作窃取”模式,简单理解为:当执行新的任务时它可以将其拆分成更小的任务执行,

并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。如果某个子问题由于阻塞而无法继续执行的时候,那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行,这种方式减少了

线程的等待时间,提高了性能。再说的直白一点就是,其他线程完事之后不会变成空闲状态,而是去别的线程队列上“偷”一个来“帮助”执行(因为线程队列是双端队列,所以可以从尾部进行“偷取”),更好的利用了CPU资源。

技术图片

 

  Fork/Join在java1.7的时候就有了,为啥没有广泛的利用呢?写个Demo就知道了!

package com.lql.java8;

import java.util.concurrent.RecursiveTask;

/**
 * @author: lql
 * @date: 2019.08.24
 * Description: ForkJoinDemo
 */
public class ForkJoinCalculate extends RecursiveTask<Long> 
    /**
     * 必须继承 :
     *          RecursiveTask                          或者                         RecursiveAction
     *              有返回值                                                           返回值
     *          @Override                                                           @Override
     *       protected Long compute() return null;                                protected void compute()  
     */

    //需求:做个累加操作
    /**
     * start:起始
     * end:终止
     * THRESHOLD:临界值
     */
    private long start;
    private long end;
    private static long THRESHOLD = 10000;

    public ForkJoinCalculate(long start,long end) 
        this.start = start;
        this.end = end;
    


    @Override
    protected Long compute() 
        long length = end - start;
        if (length <= THRESHOLD) 
            //小于等于临界值将不在拆分
            long sum = 0; //累加
            for (long i = start; i <= end; i++ ) 
                sum+=i;
            
            return sum;
         else 
            long mid= (start + end) / 2 ;
            //如果没到临界值则递归去拆
            ForkJoinCalculate left = new ForkJoinCalculate(start,end);
            left.fork();   //拆分子任务

            ForkJoinCalculate right = new ForkJoinCalculate(mid+1,end);
            right.fork();
            //汇总
            return left.join()+right.join();
        
    

接下来进行测试:

package com.lql.java8;

import org.junit.Test;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

/**
 * @author: lql
 * @date: 2019.08.24
 * Description:     ForkJoin测试
 */
public class ForkTestTest 

    @Test
    public void test() 
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinTask<Long> task = new ForkJoinCalculate(0,1000000000L);
        Long sum = pool.invoke(task);
        System.out.println(sum);
    

可以看出来代码复杂,这也是没有广泛使用起来的原因,在java8中,并行流很好的解决这种复杂性,也是做累加操作,代码如下:

  @Test
    public void test2() 
        LongStream.range(0, 1000000000L).parallel().reduce(0,Long::sum);
    

 这里介绍下“并行流和顺序流

并行流:就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流,StreamAPI可以声明性地通过parallel()与sequential()在并行流和顺序流之间进行切换

顺序流:执行过程是一个连续的步骤序列,它在完成一个活动之后会去执行到下一个

当然parallel()底层就是Fork/Join.

 

以上是关于Fork/Join框架的主要内容,如果未能解决你的问题,请参考以下文章

线程基础:多任务处理(13)——Fork/Join框架(解决排序问题)

JUC系列Fork/Join框架之概览

Fork/Join框架基本使用

Fork/Join框架详解

Fork/Join框架详解

多线程 fork/join 并行计算