ForkJoinPool线程池工作原理

Posted 胡尚

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ForkJoinPool线程池工作原理相关的知识,希望对你有一定的参考价值。

文章目录

线程池ForkJoinPool工作原理

归并排序算法

算法题:如何充分利用多核CPU的性能,快速对一个2千万大小的数组进行排序?

这道题就可以使用归并排序算法来实现。



什么是归并排序

归并排序(Merge Sort)是一种基于分治思想的排序算法。归并排序的基本思想是将一个大数组分成两个相等大小的子数组,对每个子数组分别进行排序,然后将两个子数组合并成一个有序的大数组。因为常常使用递归实现(由先拆分后合并的性质决定的),所以我们称其为归并排序。

归并排序的时间复杂度为O(nlogn),空间复杂度为O(n),其中n为数组的长度。

分治思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题相互独立且与原问题性质相同。求出子问题的解,就可得到原问题的解。

分治思想的步骤如下:

  1. 分解:将要解决的问题划分成若干规模较小的同类问题;

  2. 求解:当子问题划分得足够小时,用较简单的方法解决;

  3. 合并:按原问题的要求,将子问题的解逐层合并构成原问题的解。

归并排序演示



单线程实现归并排序

单线程归并算法的实现,它的基本思路是将序列分成两个部分,分别进行递归排序,然后将排序好的子序列合并起来。

public class MergeSort 

    private final int[] arrayToSort; //要排序的数组
    private final int threshold;  //拆分的阈值,低于此阈值就不再进行拆分

    public MergeSort(final int[] arrayToSort, final int threshold) 
        this.arrayToSort = arrayToSort;
        this.threshold = threshold;
    

    /**
     * 排序
     * @return
     */
    public int[] mergeSort() 
        return mergeSort(arrayToSort, threshold);
    

    public static int[] mergeSort(final int[] arrayToSort, int threshold) 
        //拆分后的数组长度小于阈值,直接进行排序
        if (arrayToSort.length < threshold) 
            //调用jdk提供的排序方法
            Arrays.sort(arrayToSort);
            return arrayToSort;
        

        int midpoint = arrayToSort.length / 2;
        //对数组进行拆分
        int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);
        int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length);
        //递归调用
        leftArray = mergeSort(leftArray, threshold);
        rightArray = mergeSort(rightArray, threshold);
        //合并排序结果
        return merge(leftArray, rightArray);
    

    public static int[] merge(final int[] leftArray, final int[] rightArray) 
        //定义用于合并结果的数组
        int[] mergedArray = new int[leftArray.length + rightArray.length];
        int mergedArrayPos = 0;
        // 利用双指针进行两个数的比较
        int leftArrayPos = 0;
        int rightArrayPos = 0;
        while (leftArrayPos < leftArray.length && rightArrayPos < rightArray.length) 
            if (leftArray[leftArrayPos] <= rightArray[rightArrayPos]) 
                mergedArray[mergedArrayPos] = leftArray[leftArrayPos];
                leftArrayPos++;
             else 
                mergedArray[mergedArrayPos] = rightArray[rightArrayPos];
                rightArrayPos++;
            
            mergedArrayPos++;
      

        while (leftArrayPos < leftArray.length) 
            mergedArray[mergedArrayPos] = leftArray[leftArrayPos];
            leftArrayPos++;
            mergedArrayPos++;
        

        while (rightArrayPos < rightArray.length) 
            mergedArray[mergedArrayPos] = rightArray[rightArrayPos];
            rightArrayPos++;
            mergedArrayPos++;
        

        return mergedArray;
    



Fork/Join并行归并排序

并行归并排序是一种利用多线程实现的归并排序算法。它的基本思路是将数据分成若干部分,然后在不同线程上对这些部分进行归并排序,最后将排好序的部分合并成有序数组。在多核CPU上,这种算法也能够有效提高排序速度。

可以使用Java的Fork/Join框架来实现归并排序的并行化

public class MergeSortTask extends RecursiveAction 

   private final int threshold; //拆分的阈值,低于此阈值就不再进行拆分
   private int[] arrayToSort; //要排序的数组

   public MergeSortTask(final int[] arrayToSort, final int threshold) 
      this.arrayToSort = arrayToSort;
      this.threshold = threshold;
   

   @Override
   protected void compute() 
      //拆分后的数组长度小于阈值,直接进行排序
      if (arrayToSort.length <= threshold) 
         // 调用jdk提供的排序方法
         Arrays.sort(arrayToSort);
         return;
      

      // 对数组进行拆分
      int midpoint = arrayToSort.length / 2;
      int[] leftArray = Arrays.copyOfRange(arrayToSort, 0, midpoint);
      int[] rightArray = Arrays.copyOfRange(arrayToSort, midpoint, arrayToSort.length);

      MergeSortTask leftTask = new MergeSortTask(leftArray, threshold);
      MergeSortTask rightTask = new MergeSortTask(rightArray, threshold);

      //调用任务,阻塞当前线程,直到所有子任务执行完成
      invokeAll(leftTask,rightTask);
      //提交任务
//    leftTask.fork();
//    rightTask.fork();
//    //合并结果
//    leftTask.join();
//    rightTask.join();

      // 合并排序结果
      arrayToSort = MergeSort.merge(leftTask.getSortedArray(), rightTask.getSortedArray());
   

   public int[] getSortedArray() 
      return arrayToSort;
   



测试结果对比

写一个主方法,使用自定义的工具类首先创建一个数组,然后分别调用上面两个方法区测试

public class ArrayToSortMain 

    public static void main(String[] args) 
        //生成测试数组  用于归并排序
        int[] arrayToSortByMergeSort = Utils.buildRandomIntArray(20000000);
        //生成测试数组  用于forkjoin排序
        int[] arrayToSortByForkJoin = Arrays.copyOf(arrayToSortByMergeSort, arrayToSortByMergeSort.length);
        //获取处理器数量
        int processors = Runtime.getRuntime().availableProcessors();


        MergeSort mergeSort = new MergeSort(arrayToSortByMergeSort, processors);
        long startTime = System.nanoTime();
        // 归并排序
        mergeSort.mergeSort();
        long duration = System.nanoTime()-startTime;
        System.out.println("单线程归并排序时间: "+(duration/(1000f*1000f))+"毫秒");

        //利用forkjoin排序
        MergeSortTask mergeSortTask = new MergeSortTask(arrayToSortByForkJoin, processors);
        //构建forkjoin线程池
        ForkJoinPool forkJoinPool = new ForkJoinPool(processors);
        startTime = System.nanoTime();
        //执行排序任务
        forkJoinPool.invoke(mergeSortTask);
        duration = System.nanoTime()-startTime;
        System.out.println("forkjoin排序时间: "+(duration/(1000f*1000f))+"毫秒");

    


测试结果如下所示



并行实现归并排序的优化和注意事项

在实际应用中,我们需要考虑数据分布的均匀性、内存使用情况、线程切换开销等因素,以充分利用多核CPU并保证算法的正确性和效率。

以下是并行实现归并排序的一些优化和注意事项:

  • 任务的大小:任务大小的选择会影响并行算法的效率和负载均衡,如果任务太小,会造成任务划分和合并的开销过大;如果任务太大,会导致任务无法充分利用多核CPU并行处理能力。因此,在实际应用中需要根据数据量、CPU核心数等因素选择合适的任务大小。
  • 负载均衡:并行算法需要保证负载均衡,即各个线程执行的任务大小和时间应该尽可能相等,否则会导致某些线程负载过重,而其他线程负载过轻的情况。在归并排序中,可以通过递归调用实现负载均衡,但是需要注意递归的层数不能太深,否则会导致任务划分和合并的开销过大。
  • 数据分布:数据分布的均匀性也会影响并行算法的效率和负载均衡。在归并排序中,如果数据分布不均匀,会导致某些线程处理的数据量过大,而其他线程处理的数据量过小的情况。因此,在实际应用中需要考虑数据的分布情况,尽可能将数据分成大小相等的子数组。
  • 内存使用:并行算法需要考虑内存的使用情况,特别是在处理大规模数据时,内存的使用情况会对算法的执行效率产生重要影响。在归并排序中,可以通过对数据进行原地归并实现内存的节约,但是需要注意归并的实现方式,以避免数据的覆盖和不稳定排序等问题。
  • 线程切换:线程切换是并行算法的一个重要开销,需要尽量减少线程的切换次数,以提高算法的执行效率。在归并排序中,可以通过设置线程池的大小和调整任务大小等方式控制线程的数量和切换开销,以实现算法的最优性能


Fork/Join框架

简介

Fork/Join框架在JDK7时引入,是一个是一个并行计算的框架,Fork/Join框架可以将一个大任务拆分为很多小任务来异步执行

Fork/Join框架主要包含三个模块:

  • 线程池ForkJoinPool
  • 任务对象 ForkJoinTask
  • 执行任务的线程 ForkJoinWorkerThread

线程池中产生线程,线程去执行任务对象中任务


应用场景:

  • 递归分解型任务。例如排序、归并、遍历等
  • 数组处理。例如数组的排序、查找、统计等
  • 并行化算法。例如并行化的图像处理算法、并行化的机器学习算法等。
  • 大数据处理。例如大型日志文件的处理、大型数据库的查询等。

Fork/Join框架的主要组成部分是ForkJoinPoolForkJoinTaskForkJoinPool是一个线程池,它用于管理ForkJoin任务的执行。ForkJoinTask是一个抽象类,用于表示可以被分割成更小部分的任务。



ForkJoinPool

ForkJoinPool是Fork/Join框架中的线程池类,它用于管理Fork/Join任务的线程。

ForkJoinPool类包括一些重要的方法,例如submit()、invoke()、shutdown()、awaitTermination()等,用于提交任务、执行任务、关闭线程池和等待任务的执行结果。

ForkJoinPool类中还包括一些参数,例如线程池的大小、工作线程的优先级、任务队列的容量等,可以根据具体的应用场景进行设置。



ForkJoinPool线程池构造方法

它的构造方法有下面四种

其中有四个核心的参数

  • int parallelism

    并行级别,决定工作线程数量。默认值是Runtime.getRuntime().availableProcessors()cpu逻辑处理器数量

  • ForkJoinWorkerThreadFactory factory

    ForkJoinPool再创建线程时是通过Factory来创建的。

    如果我们自己指定需要实现ForkJoinWorkerThreadFactory接口,如果不指定则默认使用DefaultForkJoinWorkerThreadFactory来创建线程

  • UncaughtExceptionHandler handler

    指定异常处理器,当任务在运行中出错时,将由设定的处理器处理

  • boolean asyncMode

    队列工作模式,默认值false。设置为true是先进先出的队列,false这是先进后出的栈


创建一个ForkJoinPool线程池

//获取处理器数量
int processors = Runtime.getRuntime().availableProcessors();
//构建forkjoin线程池
ForkJoinPool forkJoinPool = new ForkJoinPool(processors);

// 其实上面两行代码就等于下面这一行,因为从ForkJoinPool的三个构造方法就能看出来,默认值就是Runtime.getRuntime().availableProcessors()
ForkJoinPool forkJoinPool = new ForkJoinPool()

任务提交方式

任务提交是ForkJoinPool的核心能力之一,提交任务有三种方式:

返回值方法
提交异步执行voidexecute(ForkJoinTask task)
execute(Runnable task)
等待并获取结果Tinvoke(ForkJoinTask task)
提交执行获取Future结果ForkJoinTasksubmit(ForkJoinTask task)
submit(Callable task)
submit(Runnable task)
submit(Runnable task, T result)

ForkJoinPool线程池和ThreadPoolExecutor普通线程池的区别

  • 工作窃取算法

    ForkJoinPool采用工作窃取算法来提高线程的利用率,而普通线程池则采用任务队列来管理任务。

    在工作窃取算法中,当一个线程完成自己的任务后,它可以从其它线程的队列中获取一个任务来执行,以此来提高线程的利用率。

  • 任务的分解和合并

    ForkJoinPool是分解任务再合并结果。普通线程池是按照任务提交顺序一个一个执行

  • 工作线程的数量

    ForkJoinPool会根据当前系统的CPU核心数来自动设置工作线程的数量,以最大限度地发挥CPU的性能优势。

    普通线程池需要手动设置线程池的大小,如果设置不合理,可能会导致线程过多或过少,从而影响程序的性能。

  • 任务类型

    ForkJoinPool适用于执行大规模任务并行化

    普通线程池适用于执行一些短小的任务,如处理请求等。

  • 阻塞队列不一样

    ForkJoinPool中每个工作线程都有自己的队列。而普通线程池就只有一个阻塞队列



ForkJoinTask

ForkJoinTask是Fork/Join框架中的抽象类,它定义了任务的基本接口。

用户可以通过继承ForkJoinTask类来实现自己的任务类,并重写其中的compute()方法来定义任务的执行逻辑。

通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类并重写其中的compute()方法,Fork/Join框架提供了以下三个子类:

  • RecursiveAction:用于递归执行但不需要返回结果的任务。
  • RecursiveTask :用于递归执行需要返回结果的任务。
  • CountedCompleter :在任务完成执行后会触发执行一个自定义的钩子函数

ForkJoinTask 最核心的是 fork() 方法和 join() 方法,承载着主要的任务协调作用,一个用于任务提交,一个用于结果获取。

  • fork()——提交任务

    fork()方法用于向当前任务所运行的线程池中提交任务。如果当前线程是ForkJoinWorkerThread类型,将会放入该线程的工作队列,否则放入common线程池的工作队列中。

  • join()——获取任务执行结果

    join()方法用于获取任务的执行结果。调用join()时,将阻塞当前线程直到对应的子任务完成运行并返回结果。



处理递归任务

也并不是所有的递归都适合使用ForkJoin来处理,对于递归深度较大是任务就需要注意栈溢出的情况。就比如下面这个案例

计算斐波那契数列

斐波那契数列指的是这样一个数列:1,1,2,3,5,8,13,21,34,55,89… 这个数列从第3项开始,每一项都等于前两项之和。

public class FibonacciDemo extends RecursiveTask<Integer> 
    
    // 求出第n位上的值是多少
    final int n;

    FibonacciDemo(int n) 
        this.n = n;
    

    /**
     * 重写RecursiveTask的compute()方法
     * @return
     */
    protected Integer compute() 
        if (n <= 1)
            return n;
        FibonacciDemo f1 = new FibonacciDemo(n - 1);
        //提交任务
        f1.fork();
        FibonacciDemo f2 = new FibonacciDemo(n - 2);
        //合并结果
        // f1.fork();实际上底层就是调用的compute()方法,f1.join()就是获取compute()方法的结果。所以这里直接f2.compute()调用也是一样的
        return f2.compute() + f1.join();
    

    public static void main(String[] args) 
        //构建forkjoin线程池
        ForkJoinPool pool = new ForkJoinPool();
        FibonacciDemo task = new FibonacciDemo(100000);
        //提交任务并一直阻塞直到任务 执行完成返回合并结果。
        int result = pool.invoke(task);
        System.out.println(result);
    

如果n传的值较少不会有什么问题,但n如果很大就很容易出现栈溢出。递归计算Fibonacci数列的任务数量呈指数级增长


解决栈溢出问题

我们可以使用迭代的方式计算Fibonacci数列,以避免递归过程中占用大量的栈空间。

/**
 * 使用迭代方式计算Fibonacci数列
 */
public class FibonacciDemo2 
    public static void main(String[] args) 
        int n = 100000;
        long[] fib = new long[n + 1];
        fib[0] = 0;
        fib[1] = 1;
        for (int i = 2; i <= n; i++) 
            fib[i] = fib[i - 1] + fib[i - 2];
        
        System.out.println(fib[n]);
    


处理递归任务注意事项

对于一些递归深度较大的任务,使用Fork/Join框架可能会出现任务调度和内存消耗的问题。

当递归深度较大时,会产生大量的子任务,这些子任务可能被调度到不同的线程中执行,而线程的创建和销毁以及任务调度的开销都会占用大量的资源,从而导致性能下降。

此外,对于递归深度较大的任务,由于每个子任务所占用的栈空间较大,可能会导致内存消耗过大,从而引起内存溢出的问题。

因此,在使用Fork/Join框架处理递归任务时,需要根据实际情况来评估递归深度和任务粒度,以避免任务调度和内存消耗的问题。如果递归深度较大,可以尝试采用其他方法来优化算法,如使用迭代方式替代递归,或者限制递归深度来减少任务数量,以避免Fork/Join框架的缺点。



处理阻塞任务

ForkJoin其实是不适合处理阻塞任务的,如果是阻塞任务尽量还是使用ThreadPoolExecutor普通线程池,重点就是不能让ForkJoin的工作线程阻塞。

如果一定要在ForkJoinPool中使用阻塞型任务,那需要注意以下几点:

  1. 防止线程饥饿:当一个线程在执行一个阻塞型任务时,它将会一直等待任务完成,这时如果没有其他线程可以窃取任务,那么该线程将一直被阻塞,直到任务完成为止。为了避免这种情况,应该避免在ForkJoinPool中提交大量的阻塞型任务。

  2. 使用特定的线程池:为了最大程度地利用ForkJoinPool的性能,可以使用专门的线程池来处理阻塞型任务,这些线程不会被ForkJoinPool的窃取机制所影响。例如,可以使用ThreadPoolExecutor来创建一个线程池,然后将这个线程池作为ForkJoinPool的执行器,这样就可以使用ThreadPoolExecutor来处理阻塞型任务,而使用ForkJoinPool来处理非阻塞型任务。

  3. 不要阻塞工作线程:如果在ForkJoinPool中使用阻塞型任务,那么需要确保这些任务不会阻塞工作线程,否则会导致整个线程池的性能下降。为了避免这种情况,可以将阻塞型任务提交到一个专门的线程池中,或者使用CompletableFuture等异步编程工具来处理阻塞型任务。

    使用CompletableFuture时一定要使用自己定义的一个线程池,当做参数传进去,


下面是一个使用阻塞型任务的例子,这个例子展示了如何使用CompletableFuture来处理阻塞型任务:

public class BlockingTaskDemo 
    public static void main(String[] args) 
        //构建一个forkjoin线程池
        ForkJoinPool pool = new ForkJoinPool();

        //创建一个异步任务,并将其提交到ForkJoinPool中执行
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> 
            try 
                // 模拟一个耗时的任务
                TimeUnit.SECONDS.sleep(5);
                return "Hello, world!";
             catch (InterruptedException e) 
                e.printStackTrace();
                return null;
            
        , pool);

        try 
            // 等待任务完成,并获取结果
            String result = future.get();

            System.out.println(result);
         catch (InterruptedException e) 
            e.printStackTrace();
         catch (ExecutionException e) 
            e.printStackTrace();
         finally 
            //关闭ForkJoinPool,释放资源
            pool.shutdown();
        
    

在这个例子中,我们使用了CompletableFuture来处理阻塞型任务,因为它可以避免阻塞ForkJoinPool中的工作线程。另外,我们也可以使用专门的线程池来处理阻塞型任务,例如ThreadPoolExecutor等。



工作原理

传统的线程池是多个线程对应一个阻塞队列,当有任务入队后多个线程会有竞争。而ForkJoinPool中每一个线程都有一个队列,而且还是双端队列,方便其他空闲线程进行工作窃取

当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。


底层有一个workerQueue[]的数组,每一个线程对应着一个workerQueue,最底层任务有一个base指针指向,最高层任务有一个top指针指向,默认是先进后出的栈结构。其他线程工作窃取时是从base指针开始窃取的。


底层核心就是工作线程和工作队列,如下图所示



工作线程ForkJoinWorkerThread

ForkJoinWorkerThread是ForkJoinPool中的一个专门用于执行任务的线程。

当一个ForkJoinWorkerThread被创建时,它会自动注册一个WorkQueue到ForkJoinPool中。

这个WorkQueue是该线程专门用于存储自己的任务的队列,只能出现在WorkQueues[]的奇数位。

在ForkJoinPool中,WorkQueues[]是一个数组,用于存储所有线程的WorkQueue。



工作队列WorkQueue

WorkQueue是一个双端队列,用于存储工作线程自己的任务。每个工作线程都会维护一个本地的WorkQueue,并且优先执行本地队列中的任务。当本地队列中的任务执行完毕后,工作线程会尝试从其他线程的WorkQueue中窃取任务。



注意:在ForkJoinPool中,只有WorkQueues[]奇数位的WorkQueue是属于ForkJoinWorkerThread线程的,因此只有这些WorkQueue才能被线程本身使用和窃取任务。

偶数位的WorkQueue是用于外部线程提交任务的,而且是由多个线程共享的,因此它们不能被线程窃取任务。



工作窃取

ForkJoinPool与ThreadPoolExecutor有个很大的不同之处在于,ForkJoinPool存在引入了工作窃取设计,它是其性能保证的关键之一。工作窃取,就是允许空闲线程从繁忙线程的双端队列中窃取任务。默认情况下,工作线程从它自己的双端队列的头部获取任务。但是,当自己的任务为空时,线程会从其他繁忙线程双端队列的尾部中获取任务。这种方法

ForkJoinPool 源码

ForkJoinPool----FJP
先看task.fork方法,含义是将当前任务,放到当前线程的工作队列中。但是第一次执行这个方法是在主线程中,主线程是不可能被FJP管理的。那么就进入ForkJoinPool.common.externalPush,在common这个default的线程池里执行这个任务,
externalPush的意思,是把外面的任务,放到当前线程池中执行。刚进入externalPush,会检查随机数不是0,workQueues不是空,这些条件第一次肯定是不满足的,那么进入externalSubmit,先初始化随机数,

ctl是一个volatile long类型的控制变量,从高到低,前16位是(当前活跃线程数-最小并发数),往后16位是(总线程数-最小并发数),再往后16位是栈顶(Treiber stack)等待线程的标志,最后16位是栈顶等待线程在线程池数组中的位置。
runState是一个volatile int类型的控制变量,来标志当前线程池运行状态,有锁住,信号,已启动,停止,终止,关闭几种状态。

当然也可以不从fork方法进来,而是pool.invoke,已经有一个FJP了,直接执行任务




以上是关于ForkJoinPool线程池工作原理的主要内容,如果未能解决你的问题,请参考以下文章

Java线程池之ForkJoinPool

提升--18---线程池--04----WorkStealingPool ForkJoinPool

java多线程 -- ForkJoinPool 分支/ 合并框架 工作窃取

为啥parallelStream 使用ForkJoinPool,而不是普通的线程池?

ForkJoinPool 源码

线程池-执行机制ForkJoinPool的commonPool详解