高并发多线程基础之线程池

Posted 踩踩踩从踩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高并发多线程基础之线程池相关的知识,希望对你有一定的参考价值。

高并发多线程之线程基础中生命周期、线程封闭、cpu缓存

前言

本篇文章描述我们jdk给我们提供的线程池;了解为什么使用线程池,有哪些优点,以及几种Executors中提供给我们的工厂创建方法等

线程池的原理

为什么要使用线程池

首先线程并不是越多越好,过多的创建线程消耗大量的资源,反而达到适得其反的效果;如何正确创建线程并且去控制。

  • 线程不仅仅java中的一个对象,每个线程都有自己的工作内存空间。

        线程创建、销毁需要时间,消耗性能

        线程过多,会占用很多的内存

  • 操作系统需要频繁切换线程上下文 (都处于ruannable的状态),影响性能。
  • 如果创建时间+销毁时间>执行任务时间,就很不合算。

让一个线程执行很多任务,线程池得推出就是为了方便控制线程的数量

概念

帮我们创建和管理线程的一个中心 

线程池管理器:用于创建并管理线程池,包括创建线程池,销毁线程池,添加新任务;

工作线程:线程池中的线程、可以循环的执行任务、在没有任务时处于等待状态

任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务收尾工作,任务执行的状态等

任务队列:用于存放没有处理的任务。提供一种缓存机制

 需要实现runnable接口的对象

类的层次

  • Executor 最上层接口,之定义了executor
  • ExecutorService接口 继承了Executor接口,并扩展出Callable、future、关闭方法
  • ScheduledExecutorService接口  继承ExecutorService接口,扩展出定时任务执行方法
  • ThreadPoolExecutor 基础、标准的线程池实现
  • ScheduledThreadPoolExecutor 继承ThreadPoolExecutor  ,并实现ScheduledExecutorService接口相关的定时任务方法

ExecutorService 接口

定义出submit 方法和 invokeAll方法,包括invokeAny 方法;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

其中定义出的invokeAll 的意义就是提交任务列表,全部执行完毕,返回执行结果;或者定义好超时时间,自动返回结果

  <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

 invokeAny 的意义是执行一个成功,就返回结果。

在来看submit  的源代码。

  Future<?> submit(Runnable task);
  <T> Future<T> submit(Callable<T> task);

在其中的任务,包括callable和runnable;返回指定类型。  

class MyCallable implements Callable<Integer>{

    @Override
    public Integer call() throws Exception {
        return 1;
    }
}


class MyRunnable implements Runnable{

    @Override
    public void run() {
        return ;
    }
}

在测试 submit方法

static ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 5, TimeUnit.SECONDS,
			new LinkedBlockingQueue<Runnable>());

	public static void main(String args[]) throws ExecutionException, InterruptedException {
		// Runnable Test
		Future run_future = pool.submit(new MyRunnable());
		System.out.println("run_future: " + run_future.get());

		// Callable Test
		Future call_future = pool.submit(new MyCallable());
		System.out.println("call_future:" + call_future.get());

	}

得到得结果,虽然runnable 会返回future,但是是返回null。

run_future: null
call_future:1

除非使用重载得方法。预先设定好返回值;得到得结果就是设置的返回结果

// Runnable Test
		Future run_future = pool.submit(new MyRunnable(), 1);
		System.out.println("run_future: " + run_future.get());

提供的 在阻塞所有的任务的方法

/**

*阻塞,直到关闭后所有任务都已完成执行请求,或者发生超时,或者当前线程正在运行
*中断,以先发生者为准。
*/   
 boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

在来继续分析一下参数的意思

  • 第一个参数为核心线程数,表示默认会创建5个核心线程去处理任务
  • 第二个参数为最大线程数,表示当任务队列满了过后,会继续创建线程数为10个处理任务
  • 第三和第四个参数一起用的,超时时间,当大于核心线程空置超过5秒则销毁线程
  • 第五个参数,则是创建一个任务队列。
  ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5,
                10,
                5,      //超过核心线程数的线程,如果超过5s(keepAliveTime)还没有任务给他执行,这个线程就会被销毁
                TimeUnit.SECONDS,                       //keepAliveTime 的时间单位
                new LinkedBlockingQueue<Runnable>(5)     //传入无界的等待队列
        );
 /**
     * 测试: 提交15个执行时间需要3秒的任务,看线程池的状况
     *
     * @param threadPoolExecutor 传入不同的线程池,看不同的结果
     * @throws Exception
     */
    public void testCommon(ThreadPoolExecutor threadPoolExecutor) throws Exception {
        // 测试: 提交15个执行时间需要3秒的任务,看超过大小的2个,对应的处理情况
        for (int i = 0; i < 30; i++) {
            int n = i;
            threadPoolExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("任务" + n +" 开始执行");
                        Thread.sleep(3000L);
                        System.err.println("任务" + n +" 执行结束");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.out.println("任务" + i + " 提交成功");
        }

        while(true){
            // 查看线程数量,查看队列等待数量
            Thread.sleep(1000L);
            System.out.println(">>> 线程数量:" + threadPoolExecutor.getPoolSize());
            System.out.println(">>> 队列任务数量:" + threadPoolExecutor.getQueue().size());
        }

    }

这里得到测试的结果,引申出的线程池的原理

 只有当execute 或者submit 执行任务时,才去创建新线程;默认的拒绝的策略是抛异常

  ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5,
                10,             //最大线程数 10
                5,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(5),   //等待队列容量为3

                //最多容纳13个任务,超出的会被拒绝执行
                new RejectedExecutionHandler() {    //指定 任务拒绝策略
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.err.println("有任务被拒绝执行了");
                    }
                });

可以自己定义出拒绝策略,只需要添加第六个参数 ,重写拒绝策略

Executors类工具类

这个是jdk提供的封装好的线程池,在开发中,方便快捷,因此采用这个工具类

  • newFixedThreadPool(int nThreads) 创建一个固定大小、任务队列无界的线程池,核心线程等于最大线程数
 public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • newCachedThreadPool() 创建的是一个大小无界的缓冲线程池。它的任务队列是一个同步队列。任务加入到池中,如果 池中有空闲线程,则用空闲线程执行,如无则创建新线程执行。池中的线程空闲超过60秒,将被销毁释放。线程数随任务的多少变化。适用于执行耗时较小的异步任务。池的核心线程数=0 ,最大线程数= Integer.MAX_VALUE
     
  public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
这里的SynchronousQueue 通过队列是不存任何元素的,同步队列,马上获取到数据。
这个队列为0,也就是说put方法, 是put不进去的,只有take方法,不断在读取的,take到,就可以put进去
  • newSingleThreadExecutor() 只有一个线程来执行无界任务队列的单一线程池。该线程池确保任务按加入的顺序一个一 个依次执行。当唯一的线程因任务异常中止时,将创建一个新的线程来继续执行后续的任务。与newFixedThreadPool(1) 的区别在于,单一线程池的池大小在newSingleThreadExecutor方法中硬编码,不能再改变的。
  public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • newScheduledThreadPool(int corePoolSize) 能定时执行任务的线程池。该池的核心线程数由参数指定,最大线程数= Integer.MAX_VALUE
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

线程数量

如何确定合适的线程数量

  计算型任务:cpu数量的1到2倍

   IO型任务:相对于计算型任务,需要多一些线程,要根据io阻塞时长考量决定。也可以根据需要最大和最小数据量自动增减。例如tomcat最大为200

线程池基本实现

任务仓库

 //1、需要一个任务仓库
    private BlockingQueue<Runnable> blockingQueue;


    //2、 集合容器,存放工作线程
    private List<Thread> workers;

    //3、普通线程要执行多个task,需要封装一下
    public static class Worker extends Thread{

        private FixedSizeThreadPool pool;

        public Worker(FixedSizeThreadPool pool){
            this.pool = pool;
        }

        @Override
        public void run() {
            while(this.pool.isWorking || this.pool.blockingQueue.size() > 0){
                Runnable task = null;

                try {
                    //如果没有任务,就阻塞等待任务
                    if (this.pool.isWorking)
                        task = this.pool.blockingQueue.take();
                    else
                        task = this.pool.blockingQueue.poll();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                if (task != null){
                    task.run();
                }
            }
        }
    }

其他的基本就是实现submit方法 和executor方法,都比较简单的,简易版本的线程池。

最后

本篇文章主要介绍的是高并发多线程中线程池,从应用和一些代码理解线程池的应用,平常的使用方式,之后我会更新ThreadPoolExecutor从源码分析线程池怎么工作的,希望有更深入的理解

以上是关于高并发多线程基础之线程池的主要内容,如果未能解决你的问题,请参考以下文章

Java——多线程高并发系列之线程池(Executor)的理解与使用

Java——多线程高并发系列之线程池(Executor)的理解与使用

高并发之——不得不说的线程池与ThreadPoolExecutor类浅析

Java高并发之线程池详解

Okhttp的线程池和高并发

高并发多线程之线程基础中生命周期线程封闭cpu缓存