线程池基本使用和ThreadPoolExecutor核心原理讲解

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池基本使用和ThreadPoolExecutor核心原理讲解相关的知识,希望对你有一定的参考价值。

参考技术A java提供的是Executors;
spring提供的是ThreadPoolTaskExecutor;

Executors提供了4个线程池,

效果如下:

可以看到5个线程在接任务,接完5个任务之后就停止了1秒,完成之后继续接任务;

为什么要创造这么一个线程池出来呢?
因为有些时候需要用到线程池的队列任务机制,又不想多线程并发。此时就需要用单一线程池了。
以下两种写法完全一样的效果

当有空闲线程的时候就让空闲线程去做任务;
当没空闲线程的时候就新建一个线程去任务;

效果如下:

由于任务耗时不确定,所以线程池会动态根据情况去判断是否创建新的线程;

分别有3个

上面介绍的4个线程池工具,都是基于一个类 ThreadPoolExecutor
ThreadPoolExecutor 有几个重要的参数

对于fixPool线程池,corePoolSize=maximumPoolSize=n,keepAliveTime=0,workQueue=LinkedBlockingQueue,threadFactory和handler都是默认的。

对于cashPool线程池,corePoolSize=0,maximumPoolSize=2^32,keepAliveTime=60s,workQueue=SynchronousQueue,threadFactory和handler都是默认的。

我们先看一看execute方法

其中ctl参数是一个核心参数,保存着线程池的运行状态和线程数量,通过workerCountOf()获取当前的工作线程数量。
execute整个过程分成3个部分。

对于fixPool,由于workQueue是LinkedBlockingQueue,所以offer方法基本会返回true。
对于cashpool,workQueue是SynchronousQueue,如果没有消费者在take,则会立马返回false,然后立马新建一个线程。

每个线程都被包装成worker对象,worker对象会执行自己的runWorker方法,方法在死循环不停得调用getTask方法去消化任务。

getTask里面最核心的是take和poll方法,这个是跟你传入的队列特性有关。

对于spring提供的ThreadPoolTaskExecutor,其实也是对ThreadPoolExecutor的一个封装。
具体看initializeExecutor方法

在执行execute方法的时候,也是执行ThreadPoolExecutor的execute方法。

github地址: https://github.com/hd-eujian/threadpool.git
码云地址: https://gitee.com/guoeryyj/threadpool.git

线程池bing

上一篇

前言

线程缺点:
1.线程的创建它会开辟本地方法栈、虚拟机栈、程序计数器成线程私有的内存,同时消耗的时候需要销毁以上3个区域,因此频繁的创建和消耗比较消耗系统资源;
2.在任务量远远大于线程可以处理的任务量的时候,并不能友好拒绝任务。
线程池定义:使用池化技术来管理线程和使用线程的方式。
线程池的优点
1.可以避免频繁的创建和消耗线程。
2.可以更好的管理线程的个数和资源的个数。
3.拥有更多的功能,比如线程池可以进行定时任务的执行。
4.线程池可以更优化的拒绝不能处理的任务。

线程池的创建

线程池的创建总共分为7种。

1.创建固定个数线程的线程池

 public static void main(String[] args) {

        // 创建一个固定个数的线程池
        ExecutorService executorService =
                Executors.newFixedThreadPool(10);
        for (int i = 0; i < 2; i++) {
            // 执行任务
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程名:" +
                            Thread.currentThread().getName());
                }
            });
        }
    }

该线程池是懒加载模式,只有放接受的线程数大于初始化的线程数,它才建立线程(它最大能建立10个线程,超过了10复用之前建立的线程)。

2.创建缓存的线程池

使用场景:短期有大量任务的时候时候使用newCachedThreadPool()

  public static void main(String[] args) {
        // 创建带缓存的线程池
        ExecutorService executorService =
                Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程名:" +
                            Thread.currentThread().getName());
                }
            });
        }
    }

3.自定义线程池规则(线程池的名称、优先级…)

   public static void main(String[] args) {
        // 自定义线程工厂
        MyThreadFactory threadFactory = new MyThreadFactory();
        ExecutorService executorService =
                Executors.newFixedThreadPool(10, threadFactory);
        for (int i = 0; i < 10; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    Thread thread = Thread.currentThread();
                    System.out.println("线程名:" +
                            thread.getName() +
                            ",优先级:" + thread.getPriority());
                }
            });
        }
    }

    private static int count = 1;

    static class MyThreadFactory implements ThreadFactory {

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            // 自定义线程池的名称规则
            thread.setName("mythreadpool-" + count++);
            // 设置优先级
            thread.setPriority(10);
            return thread;
        }
    }

4.创建定时任务的线程池

 public static void main(String[] args) {
        // 创建执行定时任务的线程池
        ScheduledExecutorService scheduledExecutorService =
                Executors.newScheduledThreadPool(1);
        System.out.println("设置定时任务:" + new Date());
//        // 1.执行定时任务
//        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
//            @Override
//            public void run() {
//                try {
//                    Thread.sleep(1000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//                System.out.println("执行任务:" + new Date());
//            }
//        }, 1, 3, TimeUnit.SECONDS);
          // 2.执行定时任务
//        scheduledExecutorService.schedule(new Runnable() {
//            @Override
//            public void run() {
//                System.out.println("执行任务:" + new Date());
//            }
//        }, 1, TimeUnit.SECONDS);
            // 3.执行定时任务
        scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("执行任务:" + new Date());
            }
        }, 1, 3, TimeUnit.SECONDS);

    }

第一种:
参数1:线程执行的任务Runnable参数2:延迟一段时间执行
参数3:定时任务执行的频率
参数4:配合参数2和参数3使用的,它规定了时间的单位
第二种:
推迟某秒,只会执行一次
第三种
scheduleWithFixedDelay

scheduleAtFixedRate:以上次任务的开始时间作为下一次任务的开始时间的。
scheduleWithFixedDelay:以上次任务的结束时间作为下一次任务的开始时间的。

5.创建单线程执行定时任务的线程池

 public static void main(String[] args) {
        // 创建单个执行定时任务的线程池
        ScheduledExecutorService service =
                Executors.newSingleThreadScheduledExecutor();
        service.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("执行任务:" + new Date());
            }
        }, 1, 3, TimeUnit.SECONDS);

    }

6.创建单线程的线程池

   public static void main(String[] args) {
        // 创建单个线程的线程池
        ExecutorService service = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            int finalI = i;
            service.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程名:" +
                            Thread.currentThread().getName() +
                            ",I=" + finalI);
                }
            });
        }

    }

补充:创建单个线程池的好处

1.可以避免频发创建和消耗线程带来的性能开销;
2.有任务队列可以存储多余的任务;
3.当有大量的任务不能处理的时候,可以友好的执行拒绝策略;
4.线程池可以更好的管理任务(保证任务的先进先出)。

7.创建异步运行的线程池(jdk8+支持)

更具当前的CPU的个数生成对应线程(异步)个数的线程池。

   public static void main(String[] args) {
        // 创建一个异步根据当前CPU生产的线程池
        ExecutorService service = Executors.newWorkStealingPool();
        for (int i = 0; i < 100; i++) {
            service.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程名:" +
                            Thread.currentThread().getName());
                }
            });
        }

        // 等待线程池执行完成
        while (!service.isTerminated()) {
        }

    }

总结

线程池有两个重要的对象:
1.线程
2.工作队列(前面创建线程池的工作队列中任务量 Integer最大值)
如果使用前6种创建线程池的方式会导致的问题:
1.线程数量不可控(比如创建带缓存的线程池时);
2.工作任务量不可控( Integer.MAX_VALUE),可能会导致内存溢出。

8.原始创建线程池的方法

  private static int count = 1;

    public static void main(String[] args) {

        // 创建线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("myThreadPool-" + count++);
                return thread;
            }
        };

        // 原始的创建线程池的方式
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(1, 1,
                        60, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<>(8),
                        threadFactory,
                        new ThreadPoolExecutor.DiscardOldestPolicy());
        for (int i = 0; i < 10; i++) {
            final int finalI = i;
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(finalI + " 线程名:" +
                            Thread.currentThread().getName());
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }


在创建线程池时最大线程数不能超过核心线程数

  private static int count = 1;

    public static void main(String[] args) {

        // 创建线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("myThreadPool-" + count++);
                return thread;
            }
        };

        // 原始的创建线程池的方式
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(1, 1,
                        60, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<>(8),
                        threadFactory,
                        new RejectedExecutionHandler() {
                            @Override
                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                // 自定义拒绝策略,可以写到日志里面,也可以存储到数据库,也可以啥都不做
                                System.out.println("执行了拒绝策略");
                            }
                        });
        for (int i = 0; i < 10; i++) {
            final int finalI = i;
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(finalI + " 线程名:" +
                            Thread.currentThread().getName());
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

ThreadPoolExecutor执行流程

线程池的状态


线程池总共存在5种状态:
RUNNING:线程池创建之后的初始状态,这种状态下可以执行任务。
SHUTDOWN:该状态下线程池不再接受新任务,但是会将工作队列中的任务执行结束。
STOP:该状态下线程池不再接受新任务,但是不会处理工作队列中的任务,并且将会中断线程。。
TIDYING:该状态下所有任务都已终止,将会执行terminated()钩子方法。
TERMINATED:执行完terminated ()钩子方法之后。

线程执行的两种方式

1.执行任务无返回值excute(new Runnable… . )
2.执行任务有返回值sumbit(Runnbale 无返回值/Callable有返回值)

public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS,
                        new LinkedBlockingQueue<>(100));
        // 执行任务方式1
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("执行 execute 方法");
            }
        });
        // 执行一个带返回值的任务
        Future<Integer> future = threadPoolExecutor.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                // 生成随机数
                int num = new Random().nextInt(10);
                System.out.println("执行submit方法,随机数:" + num);
                return num;
            }
        });
        System.out.println("得到线程池执行结果:" + future.get());
        // 使用 submit 执行 Runnable 任务
        threadPoolExecutor.submit(new Runnable() {
            @Override
            public void run() {
                System.out.println("执行submit方法,使用的是Runnable对象");
            }
        });
        threadPoolExecutor.shutdown();
    }

下一篇

以上是关于线程池基本使用和ThreadPoolExecutor核心原理讲解的主要内容,如果未能解决你的问题,请参考以下文章

线程池bing

线程池bing

Android线程管理之ThreadPoolExecutor自定义线程池

python线程池ThreadPoolExecutor与进程池ProcessPoolExecutor

面试官一个线程池问题把我问懵逼了。

PyQt5中多线程模块QThread解决界面卡顿无响应问题,线程池ThreadPoolExecutor解决多任务耗时操作问题