提升--18---线程池--04----WorkStealingPool ForkJoinPool

Posted 高高for 循环

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了提升--18---线程池--04----WorkStealingPool ForkJoinPool相关的知识,希望对你有一定的参考价值。

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档


WorkStealingPool

原理

这个WorkStealingPool是另外一种线程池,核心非常简单,原来我们讲的线程池,一个线程的集合然后去另外一个任务的队列里头取任务,取了执行。WorkStealing指的是和原来线程池的区别每一个线程都有自己单独队列,所以任务不断往里扔的时候它会在每一个线程的队列上不断的累积,让某一个线程执行完自己的任务之后就回去另外一个线程上面偷,你给我一个拿来我用,所以这个叫WorkStealing。

WorkStealing指的是和原来线程池的区别,每一个线程都有自己单独队列

应用场景

那到底这种这种线程池的方式和我们原来讲的共享同一个任务队列,他们之间有什么好的地方和不好的地方呢?

  • 就原来这种方式呢如果有某一个线程被占了好长好长时间,然后这个任务特别重,一个特别大的任务,其他线程只能空着,他没有办法帮到任务特别重的线程。
  • 但是WorkStealingPool这种就更加灵活一些,我要是任务特别重的时候,有另外一个任务要清的,没关系,我可以分一点儿任务给你,所以呢这个就是 WorkStealing这种Pool。

源码


看这个源码,他实际上new了一个ForkJoinPool,所以本质上他是一个ForkJoinPool,所以我们只要说清楚这个ForkJoinPool之后这个WorkStealing就大概知道什么意思了,

本质上他是一个ForkJoinPool

ForkJoinPool

原理:

ForkJoinPool是这样一种线程池,它适合把大任务切分成一个一个的小任务去运行,小任务还是觉得比较大,再切,不一定是两个,也可以切成三个四个。切完这个任务执行完了要进行一个汇总.

  • 如下图所示,当然也有一些打印输出的任务不需要返回值的,只不过我们很多情况是需要进行一个结果的汇总,子任务汇总到父任务,父任务最终汇总到根任务,最后我们就得到了所有的结果,这个过程叫join,因此这个线程池就叫做ForkJoinPool。

ForkJoinTask

那我们怎么样定义这个任务呢?我们原来定义任务的时候是从Runnable来继承,在这里我们一般实现ForkJoinPool的时候需要定义成为特定的他的类型 ,这个类型呢是必须得能进行分叉的任务,所以他定义成是一种特殊类型的任务,这个叫ForkJoinTask,

RecursiveAction----不带返回值

  • 但是实际当中这个ForkJoinTask比较原始,我们可以用这个RecursiveAction,这里面有两种,第一种叫RecursiveAction递归,为什么叫递归,是因为我们大任务可以切成小任务,小任务还可以切成小任务,一直可以切到满足我的条件为止,这其中隐含了一个递归的过程,因此叫RecursiveAction,是不带返回值的任务。

RecursiveTask------有返回值

经典题目: ---- 一百万个随机数,求和

需求;

  • 我new了一个数组,这个数组长度为100万,这个数组里面装了很多数,这些数都是通过Random来new出来的,下面我要对一堆数进行总和的计算

单线程来计算

Arrays.stream(nums).sum()

public class T12_ThreadPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();

    static {
        long start = System.currentTimeMillis();

        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }

        long end = System.currentTimeMillis();

        System.out.println("单线程执行结果: " + Arrays.stream(nums).sum()); //stream api
        System.out.println("单线程执行时间: "+(end - start)+"s");
    }
}

多线程 -----ThreadPoolExecutor

分片

  • 这里面是把数组进行了分片,所以定义了一个起始的位置和一个结束的位置,默认5万一片

代码

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

public class T12_ThreadPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();

    static {
        long start = System.currentTimeMillis();

        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }

        long end = System.currentTimeMillis();

        System.out.println("单线程执行结果: " + Arrays.stream(nums).sum()); //stream api
        System.out.println("单线程执行时间: "+(end - start)+"s");
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();

        //手动分解任务
        int threadNum=nums.length/MAX_NUM;
        System.out.println("分片数: "+threadNum);

        //new 一个线程池
        ExecutorService pool = Executors.newFixedThreadPool(threadNum);
        //结果集
        Future<Long>[] futures=new Future[threadNum];


        for (int i = 0; i <threadNum ; i++) {
            MyTask task = new MyTask(MAX_NUM*(i), MAX_NUM*(i+1));
            futures[i] = pool.submit(task);
        }

        //获取各线程结果,再计算总和
        int sum =0;
        for (int i = 0; i < threadNum; i++) {
            sum+=futures[i].get();
        }


        System.out.println("ThreadPool执行结果: "+sum); //stream api

        long end = System.currentTimeMillis();
        System.out.println("多线程并行执行时间: "+(end - start)+"s");

    }


    static class MyTask implements Callable<Long> {
        int start, end;

        MyTask(int s, int e) {
            this.start = s;
            this.end = e;
        }

        @Override
        public Long call() throws Exception {
            long sum = 0L;
            for (int i = start; i < end; i++) sum += nums[i];
            return sum;
        }
    }


}

多线程 -----ForkJoinPool

ForkJoinPool可以设置成大任务可以切成小任务,小任务还可以切成小任务,递归调用,一直可以切到满足我的条件为止

package c_026_01_ThreadPool;

import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;

public class T12_ForkJoinPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();

    static {
        long start = System.currentTimeMillis();

        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }

        long end = System.currentTimeMillis();

        System.out.println("单线程执行结果: " + Arrays.stream(nums).sum()); //stream api
        System.out.println("单线程执行时间: "+(end - start)+"s");
    }



    static class AddTaskRet extends RecursiveTask<Long> {
        private static final long serialVersionUID = 1L;
        int start, end;

        AddTaskRet(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected Long compute() {
            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                return sum;
            }

            int middle = start + (end-start)/2;

            AddTaskRet subTask1 = new AddTaskRet(start, middle);
            AddTaskRet subTask2 = new AddTaskRet(middle, end);
            subTask1.fork();
            subTask2.fork();

            return subTask1.join() + subTask2.join();
        }
    }

    public static void main(String[] args) throws IOException {
        long start = System.currentTimeMillis();

        ForkJoinPool fjp = new ForkJoinPool();
        AddTaskRet task = new AddTaskRet(0, nums.length);
        fjp.execute(task);
        long result = task.join();

        System.out.println("ForkJoinPool执行结果: "+result);


        long end = System.currentTimeMillis();
        System.out.println("多线程并行执行时间: "+(end - start)+"s");
        System.in.read();
    }
}

流式API

java新特性–03–Stream简介


流式API底层也是用的ForkJoinPool实现的

案例:

需求

  • 我们new了一个ArrayList往里面装了10000个数,然后我让这个数进行计算,判断他是不是质数

stream() 顺序流

nums.forEach这个是lambda表达式同时也是一个流式处理,forEach就是拿出一个来计算看他是不是一个质数,然后计算一个时间,上面是forEach在当前线程里面拿出每一个来,

parallelStream() 并行流

下面用的是parallelStream并行流,并行流的意思是它会把里面并行的来进行处理把这个任务切分成一个个子任务,这个时候里面也是用的ForkJoinPool,两个对比就会有时间差的一个差距,所以在互相之间这个线程不需要同步的时候,你可以用这种并行流来进行处理效率会更高一些,他底层的实现也是ForkJoinPool

代码:

package c_026_01_ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

public class T13_ParallelStreamAPI {
    public static void main(String[] args) {
        List<Integer> nums = new ArrayList<>();
        Random r = new Random();
        for(int i=0; i<10000; i++) nums.add(1000000 + r.nextInt(1000000));

        //顺序流  stream().forEach

        long start = System.currentTimeMillis();
        nums.forEach(v->isPrime(v));//   nums.stream().forEach(v->isPrime(v));
        long end = System.currentTimeMillis();
        System.out.println(end - start);

        //使用parallel stream api

        start = System.currentTimeMillis();
        nums.parallelStream().forEach(T13_ParallelStreamAPI::isPrime);
        end = System.currentTimeMillis();

        System.out.println(end - start);
    }

    static boolean isPrime(int num) {
        for(int i=2; i<=num/2; i++) {
            if(num % i == 0) return false;
        }
        return true;
    }
}

总结

线程池 有两种:

  • ThreadPoolExecutor
  • ForkJoinPool

区别:

  • 他们两个的区别,前面这个ThreadPoolExecutor多个线程共享同一个任务队列
  • ForkJoinPool每个线程有自己的任务队列

以上是关于提升--18---线程池--04----WorkStealingPool ForkJoinPool的主要内容,如果未能解决你的问题,请参考以下文章

提升--15---线程池基本概念CallableFutureCompletableFuture

使用 asio 提升线程池:线程随机不执行

使用网络或 GPU 提升线程池

提升爬虫效率之线程池

Nginx引入线程池,性能提升9倍!

提升--17---线程池--03----ThreadPoolExecutor源码解析