Executor的常用方法
为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效地进行线程控制。它们都在java.util.concurrent包中,是JDK开发包的核心。其中有一个重要的类:Executors,他扮演这线程工厂的角色,我们通过Executors可以创建特定功能的线程池。
- newFixedThreadPool()方法,该方法返回一个固定数量的线程池,该方法的线程数始终不变,当有一个任务提交时,如线程池中有空闲,则立即执行,如没有,则会被暂缓在一个任务队列中等待有空闲的线程去执行。
- newSingleThreadExecutor()方法,创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。
- newCachedThreadPool()方法,返回一个可以根据实际情况调整线程个数的线程池,不限制最大线程数量,若有空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60秒后自动收回。
- newScheduleThreadPool()方法,返回一个ScheduledExecutorService对象,但该线程池可以指定线程的数量。
查看它们的源码:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { //这里super是ThreadPoolExecutor类 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); }
从源码看以看出它们都是由ThreadPoolExecutor构造出来的。
ThreadPoolExecutor构造方法概述
ThreadPoolExecutor(
int corePoolSize,//核心线程数,线程池刚初始化的时候实例化线程个数
int maximumPoolSize,//最大线程数
long keepLongTime,//空闲时间,过时回收
TimeUnit unit,//时间单位
BlockingQueue<Runable> worker,//线程暂缓处
ThreadFactory threadFactory,
RejectExecuteHandle handle//拒绝执行的方法
)
所以上面几个方法主要是根据不同的参数来执行不同的行为。其中newScheduleThreadPool方法稍复杂一点,它的方法值是ScheduledExecutorService。
ScheduledExecutorService主要有两个方法scheduleWithFixedDelay,scheduleAtFixedRate
1、scheduleAtFixedRate 方法,以固定的频率来执行某项计划(任务),即固定的频率来执行某项计划,它不受计划执行时间的影响。到时间就执行。 它不受计划执行时间的影响。
2、scheduleWithFixedDealy,相对固定的延迟后,执行某项计划(任务), 即无论某个任务执行多长时间,等执行完了,我再延迟指定的时间去执行。它受计划执行时间的影响。
class Temp extends Thread { public void run() { try { System.out.println( new Date().toString() + "..run start"); Thread.sleep(3000); System.out.println( new Date().toString() + "...run end"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } public class ScheduledJob { public static void main(String args[]) throws Exception { Temp command = new Temp(); ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); System.out.println(new Date().toString() + "...start"); //scheduler.scheduleWithFixedDelay(command, 5, 2, TimeUnit.SECONDS); scheduler.scheduleAtFixedRate(command, 5, 2, TimeUnit.SECONDS); } }
scheduleAtFixedRate 的执行结果:run方法延迟了5秒执行,run方法执行了3秒,已经超过了我们指定的间隔2秒,所以它在第一个run方法执行完后,立即执行了第二个run方法
Tue Feb 06 15:48:01 CST 2018...start Tue Feb 06 15:48:06 CST 2018..run start Tue Feb 06 15:48:09 CST 2018...run end Tue Feb 06 15:48:09 CST 2018..run start Tue Feb 06 15:48:12 CST 2018...run end Tue Feb 06 15:48:12 CST 2018..run start
scheduleWithFixedDealy的执行结果:run方法延迟了5秒执行,run方法执行了3秒,已经超过了我们指定的间隔2秒,但是它还是等了2秒后,再执行了第二个run方法
Tue Feb 06 15:53:04 CST 2018...start Tue Feb 06 15:53:10 CST 2018..run start Tue Feb 06 15:53:13 CST 2018...run end Tue Feb 06 15:53:15 CST 2018..run start Tue Feb 06 15:53:18 CST 2018...run end Tue Feb 06 15:53:20 CST 2018..run start Tue Feb 06 15:53:23 CST 2018...run end
自定义线程池
如果Executors工厂类无法满足我们的需求,可以自己去创建线程池。在自己创建线程池时,这个构造方法对于队列是什么类型比较关键:
使用有届队列(ArrayBlockingQueue):若有新的任务需要执行时,如果线程池实际线程数小于corePoolSize,则优先创建线程,若大于corePoolSize,则会将任务加入队列中等待执行,若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建行的线程,诺线程数大于maximumPoolSize,则执行拒绝策略。
使用无界队列(LinkedBlockingQueue):除非系统资源耗尽,否则无界的任务队列不存在入队失败的情况。当有新任务来时,系统的线程数小于corePoolSize时,则新建线程执行任务,当达到corePoolSize后,就不会继续增加,若后续还有新的任务加入,而没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大时,无界队列会保持快速增长,直到耗尽系统内存。注意:maximumPoolSize在无界队列时没有作用
JDK拒绝策略:
AbortPolicy:直接抛出异常组织系统正常工作 CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务
DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务
DiscardPolicy:丢弃无法处理的任务,不做任何处理。
如果需要自定义拒绝策略可以实现RejectedExecutionHandler接口
有届队列的例子
public class UseThreadPoolExecutor1 { public static void main(String[] args) { /** * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程, * 若大于corePoolSize,则会将任务加入队列, * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程, * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。 * */ ThreadPoolExecutor pool = new ThreadPoolExecutor( 1, //coreSize 2, //MaxSize 60, //60 TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3) //指定一种队列 (有界队列) //new LinkedBlockingQueue<Runnable>() // , new MyRejected() //, new DiscardOldestPolicy() ); MyTask mt1 = new MyTask(1, "任务1"); MyTask mt2 = new MyTask(2, "任务2"); MyTask mt3 = new MyTask(3, "任务3"); MyTask mt4 = new MyTask(4, "任务4"); MyTask mt5 = new MyTask(5, "任务5"); MyTask mt6 = new MyTask(6, "任务6"); pool.execute(mt1); pool.execute(mt2); pool.execute(mt3); pool.execute(mt4); pool.execute(mt5); pool.execute(mt6); pool.shutdown(); } }
只执行前四个任务(把任务5,6先不执行)
run taskId =1 run taskId =2 run taskId =3 run taskId =4
它是一个一个执行的,原因是第一个线程来的时候,立即执行了(coreSize=1),当后面三个线程来的时候会被加入到queue中(new ArrayBlockingQueue<Runnable>(3))
只执行前五个任务
run taskId =1 run taskId =5 run taskId =2 run taskId =3 run taskId =4
它是两个两个执行的,原因是第五个任务来的时候已经超出了queue的size(3),但是总线程数有小于MaxSize(2),所以任务5和任务1一起执行了,之后会从queue中拿两个再执行
执行留个任务
run taskId =1 Exception in thread "main" run taskId =5 java.util.concurrent.RejectedExecutionException: Task 6 rejected from [email protected][Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at com.bjsxt.height.concurrent018.UseThreadPoolExecutor1.main(UseThreadPoolExecutor1.java:46) run taskId =2 run taskId =3 run taskId =4
任务6来的时候,queue已经满了,总线程数也满了,这是会执行拒绝策略,JDK默认的是AbortPolicy,直接抛出异常,我们也可以改,就想代码里注释掉的一样
无界队列的例子
public class UseThreadPoolExecutor2 implements Runnable{ private static AtomicInteger count = new AtomicInteger(0); @Override public void run() { try { int temp = count.incrementAndGet(); System.out.println("任务" + temp); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception{ //System.out.println(Runtime.getRuntime().availableProcessors()); BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); ExecutorService executor = new ThreadPoolExecutor( 5, //core 10, //max 120L, //2fenzhong TimeUnit.SECONDS, queue); for(int i = 0 ; i < 20; i++){ executor.execute(new UseThreadPoolExecutor2()); } Thread.sleep(1000); System.out.println("queue size:" + queue.size()); //10 Thread.sleep(2000); } }
执行结果:
任务1 任务2 任务3 任务4 任务5 queue size:15 任务6 任务7 任务9 任务8 任务10 任务11 任务12 任务13 任务14 任务15 任务16 任务17 任务19 任务20 任务18
任务是5个5个执行的,而且把剩余的任务全部加到queue中。第二个参数没有作用,不会创建10个线程,如果是有届队列,就会起作用。
Task的代码:
public class MyTask implements Runnable { private int taskId; private String taskName; public MyTask(int taskId, String taskName){ this.taskId = taskId; this.taskName = taskName; } public int getTaskId() { return taskId; } public void setTaskId(int taskId) { this.taskId = taskId; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } @Override public void run() { try { System.out.println("run taskId =" + this.taskId); Thread.sleep(5*1000); //System.out.println("end taskId =" + this.taskId); } catch (InterruptedException e) { e.printStackTrace(); } } public String toString(){ return Integer.toString(this.taskId); } }
MyRejected的代码:
public class MyRejected implements RejectedExecutionHandler{ public MyRejected(){ } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("自定义处理.."); System.out.println("当前被拒绝任务为:" + r.toString()); } }