java8 新线程池 newWorkStealingPool

Posted OkidoGreen

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java8 新线程池 newWorkStealingPool相关的知识,希望对你有一定的参考价值。

大家好,偶然间在论坛发现大家对于 java8 中新增的线程池说明的比较少(可能是应用的比较少)以至于可能好多人都不知道这个新的线程池的存在,所以想要发一片文章想要说明下。

新的改变
java8 中创建了一个新的具有抢占式操作的线程池,每个线程都有一个任务队列存放任务。

区别
看过源码的同学都知道之前的线程池都是通过 ThreadPoolExecutor 创建出来的。而创建参数中有一个队列参数用于存放任务。而这些队列的长度都是Integer的最大值。这就导致在实际应用中会造成内存溢出情况。这也是为什么阿里巴巴 java 手册不允许使用 Executors 创建线程池的原因。

而 1.8中新增的线程池是基于ForkJoinPool 的扩展.算法思想就是窃取算法。大概的意思就是将任务按照工作线程均分。然后先工作完的线程去帮助没处理完的线程工作。以实现最快完成工作。(大概就是这个意思了)所以它适合处理很耗时的线程。而具体实现跟之前的线程池没啥太大区别

代码示例

       //创建一个具有抢占式操作的线程池 1。8 之后新增 每个线程都有一个任务队列存放任务
        ExecutorService executorService5 = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors());
        LinkedBlockingDeque<Future<String>> strings = new LinkedBlockingDeque<>();
        // CPU 核数
        System.out.println(Runtime.getRuntime().availableProcessors());
        CountDownLatch countDownLatch = new CountDownLatch(Runtime.getRuntime().availableProcessors());

        for (int i = 0; i < Runtime.getRuntime().availableProcessors()+1; i++) 
            Future<String> submit = executorService5.submit(new Callable<String>() 
                @Override
                public String call() 
                    //System.out.println("展示线程:" + Thread.currentThread().getName());
                    try 
                        TimeUnit.SECONDS.sleep(2);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    finally 
                        //countDownLatch.countDown();
                    
                    return "展示线程:" + Thread.currentThread().getName();
                
            );
            strings.offer(submit);
        

       // countDownLatch.await();

        System.out.println("over");
        executorService5.shutdown();

        strings.forEach(f-> 
            try 
                System.out.println(f.get());
             catch (InterruptedException e) 
                e.printStackTrace();
             catch (ExecutionException e) 
                e.printStackTrace();
            
        );


说明上述代码中线程池的大小采用实际硬件 cpu 大小就好。创建的多了也不会有更多的线程同时工作。

----------------------------------------------------------------------------------------

一、newWorkStealingPool是什么?
newWorkStealingPool简单翻译是任务窃取线程池。

newWorkStealingPool 是Java8添加的线程池。和别的4种不同,它用的是ForkJoinPool。

使用ForkJoinPool的好处是,把1个任务拆分成多个“小任务”,把这些“小任务”分发到多个线程上执行。这些“小任务”都执行完成后,再将结果合并。

之前的线程池中,多个线程共有一个阻塞队列,而newWorkStealingPool 中每一个线程都有一个自己的队列。

当线程发现自己的队列没有任务了,就会到别的线程的队列里获取任务执行。可以简单理解为”窃取“。

一般是自己的本地队列采取LIFO(后进先出),窃取时采用FIFO(先进先出),一个从头开始执行,一个从尾部开始执行,由于偷取的动作十分快速,会大量降低这种冲突,也是一种优化方式。

它有2种实现,如下:

1. 无参

public static ExecutorService newWorkStealingPool() 
    return new ForkJoinPool
        (Runtime.getRuntime().availableProcessors(),
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);

Runtime.getRuntime().availableProcessors()是获取当前系统可以的CPU核心数。

2. 有参
就一个参数parallelism,可以自定义并行度。

public static ExecutorService newWorkStealingPool(int parallelism) 
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);


二、newWorkStealingPool测试案例

public class Thread08_WorkStealing 

    public static void main(String[] args) 
        ExecutorService executorService = Executors.newWorkStealingPool(3);

        for (int i=1; i<= 100; i++)
            executorService.submit(new MyWorker(i));
        
        while (true)
    
ForkJoinPool-1-worker-2正在执行,数值:2
ForkJoinPool-1-worker-1正在执行,数值:1
ForkJoinPool-1-worker-3正在执行,数值:3
ForkJoinPool-1-worker-2正在执行,数值:5
ForkJoinPool-1-worker-1正在执行,数值:4
ForkJoinPool-1-worker-3正在执行,数值:6
ForkJoinPool-1-worker-2正在执行,数值:8
ForkJoinPool-1-worker-3正在执行,数值:9
ForkJoinPool-1-worker-1正在执行,数值:7
。。。。。。

发现确实创建了3个线程来执行任务。

把newWorkStealingPool(3)中参数去掉改成newWorkStealingPool(),结果如下:

ForkJoinPool-1-worker-1正在执行,数值:1
ForkJoinPool-1-worker-3正在执行,数值:3
ForkJoinPool-1-worker-2正在执行,数值:2
ForkJoinPool-1-worker-4正在执行,数值:4
ForkJoinPool-1-worker-5正在执行,数值:5
ForkJoinPool-1-worker-6正在执行,数值:6
ForkJoinPool-1-worker-7正在执行,数值:7
ForkJoinPool-1-worker-0正在执行,数值:8
ForkJoinPool-1-worker-6正在执行,数值:10
ForkJoinPool-1-worker-2正在执行,数值:13
ForkJoinPool-1-worker-0正在执行,数值:15
。。。。。。

发现确实创建了8个线程共同完成任务,因为我CPU有8个核。

以上是关于java8 新线程池 newWorkStealingPool的主要内容,如果未能解决你的问题,请参考以下文章

Java8新特性二

Java 8 并行流中的自定义线程池

java8--多线程(java疯狂讲义3复习笔记)

17.Java8新特性_传统时间格式化的线程安全问题

死磕 java线程系列之线程池深入解析——体系结构

死磕 java线程系列之线程池深入解析——生命周期