Java 线程池使用详解

Posted XeonYu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 线程池使用详解相关的知识,希望对你有一定的参考价值。

上一篇:

Java中线程安全的集合

线程池

线程池也是JUC包中提供的一个类,为啥要有线程池呢。

举一个例子:
程序运行期间,会有100个任务同时要执行,按照之前的写法那我们就要创建100个线程,运行完毕后,线程销毁。
再来500个任务,那我们就再创建500个线程,执行完毕后销毁。
先不考虑我们的设备能不能同时创建这个多线程,单单就是用完就销毁,来新任务了就再创建这个动作就是比较浪费资源的,所以,我们就可以用线程池解决这个问题。

线程池就是预先创建好一些线程,有任务需要执行的话,就从池里拿创建好的线程去执行,执行完毕后,线程不会销毁,而是归还到给线程池,下一个任务来了,再接着用。

使用线程池主要有以下好处

  • 线程复用,降低资源消耗
  • 提高程序响应速度
  • 方便管理,可以控制最大并发数

ThreadPoolExecutor

首先我们来看看ThreadPoolExecutor这个类。

ThreadPoolExecutor类是最基础的创建线程池的类。
我们需要重点关注它构造方法里的7个参数

参数意义
int corePoolSize线程池的核心线程数量,即使没有任务执行,池里的线程数量也会保持这个数量,除非设置了 allowCoreThreadTimeOut
int maximumPoolSizemaximum翻译就是最大值的意思,表示线程池允许的最大线程数
long keepAliveTime当池中的线程数量大于核心线程数量时,多余线程在终止前等待任务的最长时间,如果在指定时间内没有可执行的任务,这些空闲线程会被销毁,只保留核心线程数量个线程。
TimeUnit unit注意是TimeUnit 类型,配合keepAliveTime参数一起使用,多余空闲线程等待任务最长时间的单位
BlockingQueue workQueue保存任务的队列,一般用ArrayBlockingQueueLinkedBlockingQueue。当任务数量超过核心线程数时,任务会优先进入队列中等待。
ThreadFactory threadFactory用来创建新线程的工厂类
RejectedExecutionHandler handler拒绝策略。当任务数量大于最大线程数+队列容量时,会调用该方法处理无法被执行的任务。

以上7个参数比较重要,一定要知道各个参数的意义。

下面来简单创建个线程池看一看:

    val threadPoolExecutor = ThreadPoolExecutor(
        3,//核心线程数
        5,//允许的最大线程数
        500,//多余空闲线程的存活时间
        TimeUnit.MILLISECONDS,//存活时间单位
        LinkedBlockingQueue(3),//阻塞队列(任务等待区)
        Executors.defaultThreadFactory(),//创建线程的工厂类
        ThreadPoolExecutor.AbortPolicy()//拒绝策略
    )


如上,我们就创建好了一个核心线程数为3,最大线程数为5,队列容量为3的一个线程池。

CPU密集型和IO密集型 最大线程数 的小建议

我们要执行的任务一般分为CPU密集型和IO密集型

  • CPU 密集型一般指大量的计算任务,此时,我们可以将设备CPU的核数作为最大线程数量。
    /*获取当前设备CPU的核数*/
    val availableProcessors = Runtime.getRuntime().availableProcessors()
    println("availableProcessors = ${availableProcessors}")

    val threadPoolExecutor = ThreadPoolExecutor(
        2,
        availableProcessors,
        200,
        TimeUnit.MILLISECONDS,
        ArrayBlockingQueue(5),
        Executors.defaultThreadFactory(),
        ThreadPoolExecutor.AbortPolicy()
    )

  • IO 密集型一般指跟内存或磁盘交互较多的任务,比如网络请求,读写文件等操作。这种类型的任务一般阻塞时间会长一些,我们可以将最大线程数量设置的多一点,比如cpu核数的两倍,以便于出现cpu密集型的任务可以得到执行。
    /*获取当前设备CPU的核数*/
    val availableProcessors = Runtime.getRuntime().availableProcessors()
    println("availableProcessors = ${availableProcessors}")

    val threadPoolExecutor = ThreadPoolExecutor(
        3,
        availableProcessors*2,
        200,
        TimeUnit.MILLISECONDS,
        ArrayBlockingQueue(5),
        Executors.defaultThreadFactory(),
        ThreadPoolExecutor.AbortPolicy()
    )


线程池常用的方法

execute(Runnable command)

告诉线程池去执行给定的任务,任务为Runnable类型。

简单用一用:

fun main() {
    /*创建线程池*/
    val threadPoolExecutor = ThreadPoolExecutor(
        3,//核心线程数
        5,//允许的最大线程数
        500,//多余空闲线程的存活时间
        TimeUnit.MILLISECONDS,//存活时间单位
        LinkedBlockingQueue(3),//阻塞队列(任务等待区)
        Executors.defaultThreadFactory(),//创建线程的工厂类
        ThreadPoolExecutor.AbortPolicy()//拒绝策略
    )
    for (i in 1..5) {
        /*执行任务*/
        println("提交任务给线程池:${i}")
        threadPoolExecutor.execute {
            /*打印线程的名称*/
            TimeUnit.MILLISECONDS.sleep(100)
            println("${Thread.currentThread().name}执行了==>$i")
        }
    }
    println("main执行了")

}

看下运行效果:

可以看到,我们给线程池提交了一个任务,让线程池去执行,线程池也确实使用了一个线程执行任务了。
但是任务执行完毕后,发现程序并没有结束,一直处于运行状态,原因是线程池中有核心线程存活,等待执行任务。
也就是说,当核心线程数量大于0时,线程池在任务执行完毕后不会自动关闭,因为线程池不知道你后面什么时候是否还有任务需要执行,所以我们如果在局部方法中使用线程池的话,需要手动关闭线程池,以免出现内存泄露的情况。

如何关闭后面再说,先来看看线程池中什么时候会触发阻塞队列最大线程数以及拒绝策略

还是上面的代码,我们加一个队列任务数量的打印,把任务数量慢慢调大看一下运行效果:

fun main() {
    /*创建线程池*/
    val threadPoolExecutor = ThreadPoolExecutor(
        3,//核心线程数
        5,//允许的最大线程数
        500,//多余空闲线程的存活时间
        TimeUnit.MILLISECONDS,//存活时间单位
        LinkedBlockingQueue(3),//阻塞队列(任务等待区)
        Executors.defaultThreadFactory(),//创建线程的工厂类
        ThreadPoolExecutor.AbortPolicy()//拒绝策略
    )
    try {
        for (i in 1..7) {
            /*执行任务*/
            println("提交任务给线程池:${i}")
            threadPoolExecutor.execute {
                /*打印线程的名称*/
                TimeUnit.MILLISECONDS.sleep(100)
                println("${Thread.currentThread().name}执行了==>$i")
            }
        }

        /*打印队列等待任务的数量*/
        println("threadPoolExecutor.queue.size = ${threadPoolExecutor.queue.size}")

    } catch (e: Exception) {
        e.printStackTrace()
    } finally {

    }

    println("main执行了")

}

运行效果如下:

可以看到,会出现以下四种情况

任务数量执行情况
小于等于corePoolSize任务直接被线程池中已存在的核心线程执行
大于corePoolSize且小于等于 corePoolSize+队列容量无法被及时执行的任务会先进入队列等待,然后等核心线程空闲了被执行
大于 corePoolSize+队列容量且小于等于maximumPoolSize+队列容量其中一部分任务会在队列中等待,队列放不下的任务会创建新的线程去执行。这些新创建的线程也叫做非核心线程,任务执行完毕后,在指定存活时间内没有新任务要执行的话,就会被销毁。
大于maximumPoolSize+队列容量线程池没有办法执行该任务了,就会执行拒绝策略的方法。

以上不同任务数量的执行情况一定要搞清楚。


shutdown

上面我们说到了核心线程数量大于0的线程池不会自动关闭, 需要调用shutdown方法来关闭线程池。
来试一下:

fun main() {
    /*创建线程池*/
    val threadPoolExecutor = ThreadPoolExecutor(
        3,//核心线程数
        5,//允许的最大线程数
        500,//多余空闲线程的存活时间
        TimeUnit.MILLISECONDS,//存活时间单位
        LinkedBlockingQueue(3),//阻塞队列(任务等待区)
        Executors.defaultThreadFactory(),//创建线程的工厂类
        ThreadPoolExecutor.AbortPolicy()//拒绝策略
    )
    try {
        for (i in 1..5) {
            /*执行任务*/
            println("提交任务给线程池:${i}")
            threadPoolExecutor.execute {
                /*打印线程的名称*/
                TimeUnit.MILLISECONDS.sleep(100)
                println("${Thread.currentThread().name}执行了==>$i")
            }
        }

    } catch (e: Exception) {
        e.printStackTrace()
    } finally {
        threadPoolExecutor.shutdown()
        println("执行了关闭线程池的方法")
    }

    println("main执行了")

}

运行效果:

可以看到,程序正常结束了。
需要注意的是,调用shutdown方法并不会立刻导致线程池被关闭且销毁,而是线程池不会再接收新的任务,等线程池之前正在执行的所有任务完毕之后,才会被关闭并销毁。

另外一种操作是把核心线程数量指定为0,这样的话线程池在执行完任务后就会自动关闭线程池了。


获取线程池的执行结果 (submit、invokeAny、invokeAll)

上面的execute方法适用于没有返回值的任务,如果我们需要拿到任务的执行结果,可以用下面几个方法

方法返回值
<T> Future<T> submit(Callable<T> task)将一个任务提交给线程池执行,返回Future
<T> T invokeAny(Collection<? extends Callable<T>> tasks)提交一个任务集合给线程池之执行,当其中任意一个任务完成后就返回该任务的执行结果,并取消其他任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)提交一个任务集合给线程池之执行,使每个任务都运行,返回所有任务执行的结果

下面我们来简单用一用:

submit

fun main() {
    val threadPoolExecutor = ThreadPoolExecutor(
        2,
        5,
        200,
        TimeUnit.MILLISECONDS,
        ArrayBlockingQueue(5),
        Executors.defaultThreadFactory(),
        ThreadPoolExecutor.AbortPolicy()
    )


    val futureTask = threadPoolExecutor.submit<String> {
        TimeUnit.SECONDS.sleep(1)
        "这是返回结果"
    }

    println(futureTask.get())//这里会阻塞住后面的代码
    println("main")

}

运行效果:

invokeAny

fun main() {
    val threadPoolExecutor = ThreadPoolExecutor(
        2,
        5,
        200,
        TimeUnit.MILLISECONDS,
        ArrayBlockingQueue(5),
        Executors.defaultThreadFactory(),
        ThreadPoolExecutor.AbortPolicy()
    )


    /*任务集合*/
    val arrayListOfCallable = arrayListOf<Callable<String>>(
        Callable {
            TimeUnit.MILLISECONDS.sleep(100)
            "返回结果1"
        },
        Callable {
            TimeUnit.MILLISECONDS.sleep(60)
            "返回结果2"
        },
        Callable {
            TimeUnit.MILLISECONDS.sleep(80)
            "返回结果3"
        },
    )


    try {

        val invokeAnyResult = threadPoolExecutor.invokeAny(arrayListOfCallable)
        println(invokeAnyResult)//这里会阻塞住后面的代码
    } catch (e: Exception) {
        e.printStackTrace()
    } finally {
        threadPoolExecutor.shutdown()
    }
    println("main")

}

invokeAll

fun main() {
    val threadPoolExecutor = ThreadPoolExecutor(
        2,
        5,
        200,
        TimeUnit.MILLISECONDS,
        ArrayBlockingQueue(5),
        Executors.defaultThreadFactory(),
        ThreadPoolExecutor.AbortPolicy()
    )


    /*任务集合*/
    val arrayListOfCallable = arrayListOf<Callable<String>>(
        Callable {
            TimeUnit.MILLISECONDS.sleep(100)
            "返回结果1"
        },
        Callable {
            TimeUnit.MILLISECONDS.sleep(60)

            1 / 0
            "返回结果2"
        },
        Callable {
            TimeUnit.MILLISECONDS.sleep(80)
            "返回结果3"
        },
    )


    try {

        val invokeAllResult = threadPoolExecutor.invokeAll(arrayListOfCallable)

        println("invokeAllResult = ${invokeAllResult}")
        invokeAllResult.forEach {
            try {
                println(it.get())
            } catch (e: Exception) {
                println("异常了")
            } finally {
            }
        }

    } catch (e: Exception) {
        e.printStackTrace()
    } finally {
        threadPoolExecutor.shutdown()
    }
}

如上所示,在开发过程中,我们可以根据我们的需求去获取线程池执行的返回结果。


RejectedExecutionHandler

上面我们说到,ThreadPoolExecutor 中最后一个参数是拒绝策略,当任务无法被线程池执行时,就会走拒绝策略的代码。

ThreadPoolExecutor 中默认的拒绝策略如下,是RejectedExecutionHandler 接口类型的

该策略在拒绝任务时抛出 RejectedExecutionException异常

我们来看看RejectedExecutionHandler总共有几种实现类。

可以看到,共有四种

拒绝策略效果
AbortPolicy抛出异常
CallerRunsPolicy哪个线程调excute,就交给哪个线程处理
DiscardPolicy不会抛出异常,直接丢弃任务
DiscardOldestPolicy尝试去和最早的任务竞争获取执行机会,如果没有获取到执行机会,就被丢弃掉

一般来讲我们都是用默认的 AbortPolicy


Executors

除了最开始的通过 ThreadPoolExecutor 的构造方法来创建线程池之外,我们还可以通过Executors 类中提供好的方法来获取线程池。

如下:

方法描述
newSingleThreadExecutor()创建一个只有一个线程的线程池,核心数和最大线程数都是1
newFixedThreadPool(int nThreads)创建一个指定数量的线程池,核心数和最大线程数一致
newCachedThreadPool()创建一个核心数量为0,最大线程数量为Integer.MAX_VALUE的线程池,空闲线程存活时间为60秒 。理论上来讲,如果60秒后没有任务需要执行了,就会自动关闭该线程池

首先要明确一点的就是,以上三个方法底层都是通过调用ThreadPoolExecutor的构造方法来创建的线程池,无非是参数不同而已。
newFixedThreadPool 源码为例,代码如下:

日常开发中,不建议用以上三种方法创建线程池

原因如下:

  • newSingleThreadExecutor和newFixedThreadPool中队列长度为Integer.MAX_VALUE 数量太大,有可能出现oom

  • newCachedThreadPool中最大线程数量的值也是 Integer.MAX_VALUE,也有可能出现oom

所以,我们还是要通过 ThreadPoolExecutor 的构造来创建线程池。

下一篇:

了解 JVM和JVM内存结构(JVM运行时数据区)


如果你觉得本文对你有帮助,麻烦动动手指顶一下,可以帮助到更多的开发者,如果文中有什么错误的地方,还望指正,转载请注明转自喻志强的博客 ,谢谢!

以上是关于Java 线程池使用详解的主要内容,如果未能解决你的问题,请参考以下文章

Java 线程池详解

Java线程池详解

Java——线程池

Java线程池详解

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段

Java自带线程池和队列详解