java并发-线程池总结
Posted 河北凝讯科技订阅号
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发-线程池总结相关的知识,希望对你有一定的参考价值。
1.开篇明志
本文中,聊聊java线程池的知识点。
2.线程池是什么? 目的是什么?
线程池
一组线程实时处理休眠状态,等待唤醒执行。
目的
其一、减少在创建和销毁线程上所花的时间以及系统资源的开销 。
其二、将当前任务与主线程隔离,能实现和主线程的异步执行,特别是很多可以分开重复执行的任务。但是,一味的开线程也不一定能带来性能上的,线池休眠也是要占用一定的内存空间,所以合理的选择线程池的大小也是有一定的依据。
3.Exectors API
Java类库提供了4个静态方法来创建一个线程池:
newFixedThreadPool
创建一个固定长度的线程池,当到达线程最大数量时,线程池的规模将不再变化。newCachedThreadPool
创建一个可缓存的线程池,如果当前线程池的规模超出了处理需求,将回收空的线程;当需求增加时,会增加线程数量;线程池规模无限制。newSingleThreadPoolExecutor
创建一个单线程的Executor,确保任务对了,串行执行newScheduledThreadPool
创建一个固定长度的线程池,而且以延迟或者定时的方式来执行,类似Timer;
API总结
在线程池中执行任务比为每个任务分配一个线程优势更多,通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊线程创建和销毁产生的巨大的开销。当请求到达时,通常工作线程已经存在,提高了响应性;通过配置线程池的大小,可以创建足够多的线程使CPU达到忙碌状态,还可以防止线程太多耗尽计算机的资源。
4. ThreadPoolExecutor
java.uitl.concurrent.ThreadPoolExecutor
类是线程池中最核心的一个类, 下面来一起看看:
四个构造函数
public class ThreadPoolExecutor extends AbstractExecutorService {
//............
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue){...}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory){...}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler){...}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {...}
//............
}
从上面的代码可以得知,ThreadPoolExecutor
继承了AbstractExecutorService
类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作, 前三个在实现方法里都选择性使用了一些默认参数的实现。
下面解释下一下构造器中各个参数的含义:
corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建
corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了
allowCoreThreadTimeOut(boolean)
方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
ArrayBlockingQueue
和PriorityBlockingQueue
使用较少,一般使用LinkedBlockingQueue
和Synchronous
。
线程池的排队策略与BlockingQueue
有关。
threadFactory:线程工厂,主要用来创建线程;
handler:拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
从上面给出的ThreadPoolExecutor
类的代码可以知道,ThreadPoolExecutor
继承了AbstractExecutorService
,接着来看一下AbstractExecutorService
的实现
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) { };
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException { };
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { };
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException { };
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { };
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException { }; }
AbstractExecutorService
是一个抽象类,它实现了ExecutorService
接口。
下面再着看ExecutorService
接口的实现:
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
而ExecutorService
又是继承了Executor
接口,我们看一下Executor
接口的实现:
public interface Executor {
void execute(Runnable command); }
现在知道了 ThreadPoolExecutor
、AbstractExecutorService
、ExecutorService
和Executor
几个之间的关系了。
Executor
是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
ThreadPoolExecutor继承了类AbstractExecutorService。
在ThreadPoolExecutor类中有几个非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit() 方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了 还有很多其他的方法:
shutdown()和shutdownNow()是用来关闭线程池的。
还有很多其他的方法,比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法
5. 创建线程池基本方法
定义线程类
class Handler implements Runnable{
}
建立ExecutorService线程池
ExecutorService executorService = Executors.newCachedThreadPool();
OR
//获取当前系统的CPU 数目
int cpuNums = Runtime.getRuntime().availableProcessors();
//ExecutorService通常根据系统资源情况灵活定义线程池大小
ExecutorService executorService =Executors.newFixedThreadPool(cpuNums * POOL_SIZE);
调用线程池操作
循环操作,成为daemon,把新实例放入Executor池中
while(true){
executorService.execute(new Handler(socket));
// class Handler implements Runnable{
或者
executorService.execute(createTask(i));
//private static Runnable createTask(final int taskID)
}
execute(Runnable对象)方法其实就是对Runnable对象调用start()方法(当然还有一些其他后台动作,比如队列,优先级,IDLE timeout,active激活等)
6.几种不同的ExecutorService线程池对象
newCachedThreadPool()
缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse.如果没有,就建一个新的线程加入池中
缓存型池子通常用于执行一些生存期很短的异步型任务
因此在一些面向连接的daemon型SERVER中用得不多。newFixedThreadPool()
newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程
其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子
ScheduledThreadPool()
调度型线程池
这个池子里的线程可以按schedule依次delay执行,或周期执行
SingleThreadExecutor
单例线程,任意时间池中只能有一个线程
用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)
应用实例
CachedThreadPool 实例
CachedThreadPool
首先会按照需要创建足够多的线程来执行任务(Task)。随着程序执行的过程,有的线程执行完了任务,可以被重新循环使用时,才不再创建新的线程来执行任务。
客户端线程和线程池之间会有一个任务队列。当程序要关闭时,你需要注意两件事情:
1.入队的这些任务的情况怎么样了
2.正在运行的这个任务执行得如 何了。
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** * 功能概要:缓冲线程池实例-execute运行 */class Handle implements Runnable {
private String name;
public Handle(String name) {
this.name = "thread"+name;
}
@Override
public void run() {
System.out.println( name +" Start. Time = "+new Date());
processCommand();
System.out.println( name +" End. Time = "+new Date());
} private void processCommand() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} @Override
public String toString(){
return this.name;
}
}
运行测试CachedThreadPool
public static void testCachedThreadPool() {
System.out.println("Main: Starting at: "+ new Date());
//创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
ExecutorService exec = Executors.newCachedThreadPool();
for(int i = 0; i < 10; i++) {
exec.execute(new Handle(String.valueOf(i)));
}
//执行到此处并不会马上关闭线程池,但之后不能再往线程池中加线程,否则会报错
exec.shutdown();
System.out.println("Main: Finished all threads at"+ new Date());
}
运行效果:
从上面的结果可以看出:
1、主线程的执行与线程池里的线程分开,有可能主线程结束了,但是线程池还在运行
2、放入线程池的线程并不一定会按其放入的先后而顺序执行
FixedThreadPool 实例
FixedThreadPool模式会使用一个优先固定数目的线程来处理若干数目的任务。
规定数目的线程处理所有任务,一旦有线程处理完了任务就会被用来处理新的任务(如果有的话)。
这种模式与上面的CachedThreadPool是不同的,CachedThreadPool模式下处理一定数量的任务的线程数目是不确定的。而FixedThreadPool模式下最多 的线程数目是一定的。
应用实例:
public static void testFixThreadPool() {
System.out.println("Main Thread: Starting at: "+ new Date());
ExecutorService exec = Executors.newFixedThreadPool(5);
for(int i = 0; i < 10; i++) {
exec.execute(new Handle(String.valueOf(i)));
}
exec.shutdown(); //执行到此处并不会马上关闭线程池
System.out.println("Main Thread: Finished at:"+ new Date());
}
运行效果
:
上面创建了一个固定大小的线程池,大小为5.也就说同一时刻最多只有5个线程能运行。并且线程执行完成后就从线程池中移出。它也不能保证放入的线程能按顺序执行。这要看在等待运行的线程的竞争状态了。
newSingleThreadExecutor 实例
创建只能运行一条线程的线程池。它能保证线程的先后顺序执行,并且能保证一条线程执行完成后才开启另一条新的线程
public static void testSingleThreadPool() {
System.out.println("Main Thread: Starting at: "+ new Date());
ExecutorService exec = Executors.newSingleThreadExecutor(); //创建大小为1的固定线程池
for(int i = 0; i < 10; i++) {
exec.execute(new Handle(String.valueOf(i)));
}
exec.shutdown(); //执行到此处并不会马上关闭线程池
System.out.println("Main Thread: Finished at:"+ new Date());
}
运行效果:
等价于:
ExecutorService exec = Executors.newFixedThreadPool(1);
newScheduledThreadPool 实例
这是一个计划线程池类,它能设置线程执行的先后间隔及执行时间等,功能比上面的三个强大了一些。
实现每个放入的线程延迟10秒执行。
public static void testScheduledThreadPool() {
System.out.println("Main Thread: Starting at: "+ new Date());
ScheduledThreadPoolExecutor exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10); //创建大小为10的线程池
for(int i = 0; i < 10; i++) {
exec.schedule(new Handle(String.valueOf(i)), 10, TimeUnit.SECONDS);//延迟10秒执行
}
exec.shutdown(); //执行到此处并不会马上关闭线程池
while(!exec.isTerminated()){
//wait for all tasks to finish
}
System.out.println("Main Thread: Finished at:"+ new Date());
}
运行效果:
ScheduledThreadPoolExecutor的定时方法主要有以下四种:
下面将主要来具体讲讲scheduleAtFixedRate和scheduleWithFixedDelay
scheduleAtFixedRate 按指定频率周期执行某个任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); //command:执行线程
//initialDelay:初始化延时
//period:两次开始执行最小间隔时间
//unit:计时单位
scheduleWithFixedDelay 周期定时执行某个任务/按指定频率间隔执行某个任务(注意)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); //command:执行线程
//initialDelay:初始化延时
//period:前一次执行结束到下一次执行开始的间隔时间(间隔执行延迟时间)
//unit:计时单位
实例
class MyHandle implements Runnable {
@Override
public void run() {
System.out.println(System.currentTimeMillis());
try {
Thread.sleep(1 * 1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
按指定频率周期执行某个任务
下面实现每隔2秒执行一次,注意,如果上次的线程还没有执行完成,那么会阻塞下一个线程的执行。
即使线程池设置得足够大。
/** * 初始化延迟0ms开始执行,每隔2000ms重新执行一次任务 */ public static void executeFixedRate() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
executor.scheduleAtFixedRate(
new MyHandle(),
0,
2000,
TimeUnit.MILLISECONDS);
}
间隔指的是连续两次任务开始执行的间隔。对于scheduleAtFixedRate方法,当执行任务的时间大于我们指定的间隔时间时,它并不会在指定间隔时开辟一个新的线程并发执行这个任务。而是等待该线程执行完毕。
按指定频率间隔执行某个任务
/** * 以固定延迟时间进行执行 * 本次任务执行完成后,需要延迟设定的延迟时间,才会执行新的任务 */
public static void executeFixedDelay() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
executor.scheduleWithFixedDelay(
new MyHandle(),
0,
2000,
TimeUnit.MILLISECONDS);
}
间隔指的是连续上次执行完成和下次开始执行之间的间隔。
周期定时执行某个任务
周期性的执行一个任务,可以使用下面方法设定每天在固定时间执行一次任务。
/** * 每天晚上9点执行一次 * 每天定时安排任务进行执行 */
public static void executeEightAtNightPerDay() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
long oneDay = 24 * 60 * 60 * 1000;
long initDelay = getTimeMillis("21:00:00") - System.currentTimeMillis();
initDelay = initDelay > 0 ? initDelay : oneDay + initDelay;
executor.scheduleAtFixedRate(
new MyHandle(),
initDelay,
oneDay,
TimeUnit.MILLISECONDS);
}
/** * 获取指定时间对应的毫秒数 * @param time "HH:mm:ss" * @return */
private static long getTimeMillis(String time) {
try {
DateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");
DateFormat dayFormat = new SimpleDateFormat("yy-MM-dd");
Date curDate = dateFormat.parse(dayFormat.format(new Date()) + " " + time);
return curDate.getTime();
} catch (ParseException e) {
e.printStackTrace();
}
return 0;
}
以上是关于java并发-线程池总结的主要内容,如果未能解决你的问题,请参考以下文章