JUC高级多线程_09:ForkJoin框架的具体介绍与使用

Posted ABin-阿斌

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC高级多线程_09:ForkJoin框架的具体介绍与使用相关的知识,希望对你有一定的参考价值。

我是 ABin-阿斌:写一生代码,创一世佳话,筑一揽芳华。 如果小伙伴们觉得我的文章有点 feel ,那就点个赞再走哦。
在这里插入图片描述

ForkJoin简介

  • ForkJoinJava7 提供的原生多线程并行处理框架,其基本思想是将大任务分割成小任务,最后将小任务聚合起来得到结果。
  • fork 是分解的意思, join 是收集的意思.,它非常类似 于 HADOOP 提供的 MapReduce 框架,只是 MapReduce 的任务可以针对集群内的所有计算节点,可以充分利用集群的能力完成计算任务。ForkJoin 更加类似于单机版的 MapReduce
  • fork/join 框架中,若某个子问题由于等待另一个子问题的完成而无法继续执行。那么处理该子问题的线程会主动寻找其他尚未运行完成的子问题来执行。
  • 这种方式减少了线程的等待时间,提高了性能。子问题中应该避免使用 synchronized 关键词或其他方式的同步。也不应该是阻塞 IO 或过多的访问共享变量。
  • 在理想情况下,每个子问题的实现中都应该只进行CPU 相关的计算,并且只适用每个问题的内部对象。唯一的同步应该只发生在子问题和创建它的父问题之间。

作用

1.ForkJoinPool

  • 核心思想:

    • 对于大数据量,并行执行任务,提高效率。
    • Map Reduce: 把大任务拆分成小任务去执行。
  • 既然任务是被逐渐的细化的,那就需要把这些任务存在一个池子里面,这个池子就是 ForkJoinPool,它与其它的 ExecutorService 区别主要在于它使用—— 工作窃取,那什么是工作窃取呢?

2.什么是——工作窃取

  • 一个大任务会被划分成无数个小任务,这些任务被分配到不同的队列,这些队列有些干活干的块,有些干得慢。于是干的快的,一看自己没任务需要执行了,就去隔壁的队列里面拿去任务执行。

3.ForkJoinTask

  • ForkJoinTask 就是 ForkJoinPool 里面的每一个任务。

  • 他主要有两个子类: RecursiveAction和RecursiveTask。然后通过 fork() 方法去分配任务执行任务,通过 join() 方法汇总任务结果。

    • 具体运行过程:

    在这里插入图片描述

  • 两大子类的介绍

    • RecursiveAction: 一个递归无结果的 ForkJoinTask(没有返回值)
    • RecursiveTask: 一个递归有结果的 ForkJoinTask(有返回值)
  • ForkJoinPoolForkJoinTask 数组和 ForkJoinWorkerThread 数组组成,ForkJoinTask 数组负责存放程序提交给 ForkJoinPool 的任务,而 ForkJoinWorkerThread 数组负责执行这些任务。

4.代码案例演示:

package com.mangobin.forkjoin;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * @description: ForkJoin:多线程并行处理框架
 */
public class ForkJoinDemo {

    public static void main(String[] args) throws Exception {
        ForkJoin forkJoin = new ForkJoin(6L, 10L);
        //获取分支合并池
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        //提交池中的任务,获取任务对象
        ForkJoinTask<Long> forkJoinTask = forkJoinPool.submit(forkJoin);

        //通过 .get()方法获取任务结果
        System.out.println(forkJoinTask.get());

        //关闭池
        forkJoinPool.shutdown();


    }
}

/**
 * 继承 RecursiveTask 覆写 compute 方法,写出处理任务的具体操作
 */
class ForkJoin extends RecursiveTask<Long> {

    private Long start;
    private Long end;
    private Long temp = 10000L;

    public ForkJoin(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    /**
     * 计算方法
     *
     * @return
     */
    @Override
    protected Long compute() {
        if ((end - start) < temp) {
            Long sum = 0L;
            for (Long i = start; i <= end; i++) {
                sum += 1;

            }
            return sum;

        } else {
            //中间值
            long middle = (start + end) / 2;
            //线程操作资源类:当前资源类——ForkJoinDemo
            ForkJoin task1 = new ForkJoin(start, middle);
            // 拆分任务,把任务放入到线程队列中:递归分治
            task1.fork();
            ForkJoin task2 = new ForkJoin(middle + 1, end);
            task2.fork();
            //合并
            return task1.join() + task2.join();
        }
    }
}



4.1不同技术的对比演示:
package com.mangobin.forkjoin;


import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class ForkJoinTest {

    public static void main(String[] args) throws Exception {
        //未使用ForkJoin 执行时间:7246  注意:每个人的电脑性能不同,数据也会有差别
//        test01();
        //使用ForkJoin后 执行时间:6988
//        test02();
        //使用Stream流后  执行时间:198
        test03();
    }
    public static void test01() {
        Long sum = 0L;
        //开始时间
        long start = System.currentTimeMillis();
        for (Long i = 1L; i <= 10_0000_0000; i++) {
            sum += i;
        }
        //结束时间
        long end = System.currentTimeMillis();
        //最终执行时间
        System.out.println("总和sum=" + sum + "———>执行时间:" + (end - start));
    }
    
    public static void test02() throws Exception {
        //开始时间
        long start = System.currentTimeMillis();
        //获取分支合并池
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        ForkJoinTask<Long> task = new ForkJoin(0L, 10_0000_0000L);
        //提交池中的任务,获取任务对象
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);
        //通过 .get()方法获取任务结果
        Long sum = submit.get();
        //结束时间
        long end = System.currentTimeMillis();
        //最终执行时间
        System.out.println("总和sum=" + sum + "———>执行时间:" + (end - start));
    }

    /**
     * Stream相应方法介绍:
     *     rangeClosed(): 需要传入开始节点和结束节点两个参数,返回的是一个有序的 LongStream。
     *                   包含开始节点和结束节点两个参数之间所有的参数,间隔为 1
     *                   注意:包含最后的结束节点
     *    parallel():并行
     *    reduce:求和
     */
    public static void test03() {
        long start = System.currentTimeMillis();
        long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);
        long end = System.currentTimeMillis();
        System.out.println("总和sum=" + sum + "———>执行时间:" + (end - start));
    }
}

以上是关于JUC高级多线程_09:ForkJoin框架的具体介绍与使用的主要内容,如果未能解决你的问题,请参考以下文章

JUC高级多线程_08:线程池的具体介绍与使用

JUC高级多线程_10:Future异步回调的具体介绍与使用

JUC高级多线程_04:高并发下集合类的具体使用

JUC高级多线程_11:JMM与Volatile的具体介绍与使用

Java-JUC之ForkJoin框架

JUC高级多线程_07:读写锁与阻塞队列的具体介绍与使用