java并发-线程池总结

Posted 河北凝讯科技订阅号

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java并发-线程池总结相关的知识,希望对你有一定的参考价值。

1.开篇明志

本文中,聊聊java线程池的知识点。

2.线程池是什么? 目的是什么?

线程池

一组线程实时处理休眠状态,等待唤醒执行。

目的

  • 其一、减少在创建和销毁线程上所花的时间以及系统资源的开销 。

  • 其二、将当前任务与主线程隔离,能实现和主线程的异步执行,特别是很多可以分开重复执行的任务。但是,一味的开线程也不一定能带来性能上的,线池休眠也是要占用一定的内存空间,所以合理的选择线程池的大小也是有一定的依据。

3.Exectors API

Java类库提供了4个静态方法来创建一个线程池:

  • newFixedThreadPool 创建一个固定长度的线程池,当到达线程最大数量时,线程池的规模将不再变化。

  • newCachedThreadPool 创建一个可缓存的线程池,如果当前线程池的规模超出了处理需求,将回收空的线程;当需求增加时,会增加线程数量;线程池规模无限制。

  • newSingleThreadPoolExecutor 创建一个单线程的Executor,确保任务对了,串行执行

  • newScheduledThreadPool 创建一个固定长度的线程池,而且以延迟或者定时的方式来执行,类似Timer;


API总结

在线程池中执行任务比为每个任务分配一个线程优势更多,通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊线程创建和销毁产生的巨大的开销。当请求到达时,通常工作线程已经存在,提高了响应性;通过配置线程池的大小,可以创建足够多的线程使CPU达到忙碌状态,还可以防止线程太多耗尽计算机的资源。

4. ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor 类是线程池中最核心的一个类, 下面来一起看看:

  • 四个构造函数

public class ThreadPoolExecutor extends AbstractExecutorService {
//............  public ThreadPoolExecutor(int corePoolSize,
                     int maximumPoolSize,                              
                     long keepAliveTime,                      TimeUnit unit,                      BlockingQueue<Runnable> workQueue){...}
  public ThreadPoolExecutor(int corePoolSize,                              
                      int maximumPoolSize,                              
                      long keepAliveTime,                      TimeUnit unit,                      BlockingQueue<Runnable> workQueue,                      ThreadFactory threadFactory){...}
  public ThreadPoolExecutor(int corePoolSize,                              
                      int maximumPoolSize,                              
                      long keepAliveTime,                      TimeUnit unit,                      BlockingQueue<Runnable> workQueue,                      RejectedExecutionHandler handler){...}
   public ThreadPoolExecutor(int corePoolSize,                              
                      int maximumPoolSize,                              
                      long keepAliveTime,                      TimeUnit unit,                      BlockingQueue<Runnable> workQueue,                      ThreadFactory threadFactory,                       RejectedExecutionHandler handler) {...}
//............
}

从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作, 前三个在实现方法里都选择性使用了一些默认参数的实现。

下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建

    corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了 allowCoreThreadTimeOut(boolean) 方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒

  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

ArrayBlockingQueuePriorityBlockingQueue使用较少,一般使用LinkedBlockingQueueSynchronous

线程池的排队策略与BlockingQueue有关。

  • threadFactory:线程工厂,主要用来创建线程;

  • handler:拒绝处理任务时的策略,有以下四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 

从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,接着来看一下AbstractExecutorService的实现

public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    };    
     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {    };    
     public Future<?> submit(Runnable task) {    };    
     public <T> Future<T> submit(Runnable task, T result) {    };    
     public <T> Future<T> submit(Callable<T> task) {    };    
     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)            
               throws InterruptedException, ExecutionException, TimeoutException {    };    
     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {    };    
     public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)            
              throws InterruptedException, ExecutionException, TimeoutException {    };    
     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {    };    
     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)            
             throws InterruptedException {    }; }


AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。 
下面再着看ExecutorService接口的实现:

public interface ExecutorService extends Executor {

    void shutdown();    
     boolean isShutdown();  
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;    <T> Future<T> submit(Callable<T> task);    <T> Future<T> submit(Runnable task, T result);    Future<?> submit(Runnable task);    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)            
            throws InterruptedException;    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)            
            throws InterruptedException, ExecutionException, TimeoutException; }            

ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

public interface Executor {
   void execute(Runnable command); }


现在知道了 ThreadPoolExecutorAbstractExecutorServiceExecutorServiceExecutor几个之间的关系了。

  • Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

  • ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  • 抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

  • ThreadPoolExecutor继承了类AbstractExecutorService。

ThreadPoolExecutor类中有几个非常重要的方法:

execute()
submit()
shutdown()
shutdownNow()

execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

submit() 方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了 还有很多其他的方法:

shutdown()和shutdownNow()是用来关闭线程池的。

还有很多其他的方法,比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法  

5. 创建线程池基本方法

  • 定义线程类

class Handler implements Runnable{  
}  
  • 建立ExecutorService线程池

ExecutorService executorService = Executors.newCachedThreadPool(); 
 
   
   
 

OR

//获取当前系统的CPU 数目  
int cpuNums = Runtime.getRuntime().availableProcessors(); //ExecutorService通常根据系统资源情况灵活定义线程池大小
ExecutorService executorService =Executors.newFixedThreadPool(cpuNums * POOL_SIZE);  
  • 调用线程池操作 
    循环操作,成为daemon,把新实例放入Executor池中

while(true){  
  executorService.execute(new Handler(socket));   
     // class Handler implements Runnable{  
  或者  
  executorService.execute(createTask(i));  
      //private static Runnable createTask(final int taskID)  
}  

execute(Runnable对象)方法其实就是对Runnable对象调用start()方法(当然还有一些其他后台动作,比如队列,优先级,IDLE timeout,active激活等)


6.几种不同的ExecutorService线程池对象

  • newCachedThreadPool()

    • 缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse.如果没有,就建一个新的线程加入池中

    • 缓存型池子通常用于执行一些生存期很短的异步型任务 
      因此在一些面向连接的daemon型SERVER中用得不多。

  • newFixedThreadPool()

    • newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程

    • 其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子

  • ScheduledThreadPool()

    • 调度型线程池

    • 这个池子里的线程可以按schedule依次delay执行,或周期执行

  • SingleThreadExecutor

    • 单例线程,任意时间池中只能有一个线程

    • 用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)

应用实例

CachedThreadPool 实例

CachedThreadPool 首先会按照需要创建足够多的线程来执行任务(Task)。随着程序执行的过程,有的线程执行完了任务,可以被重新循环使用时,才不再创建新的线程来执行任务。

客户端线程和线程池之间会有一个任务队列。当程序要关闭时,你需要注意两件事情: 
1.入队的这些任务的情况怎么样了 
2.正在运行的这个任务执行得如 何了。

import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** * 功能概要:缓冲线程池实例-execute运行 */class Handle implements Runnable {    
     private String name;
   public Handle(String name) {    
      this.name = "thread"+name;    }      @Override    public void run() {        System.out.println( name +" Start. Time = "+new Date());        processCommand();        System.out.println( name +" End. Time = "+new Date());    }     private void processCommand() {    
            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }        }     @Override        public String toString(){        
             return this.name;        }   }

运行测试CachedThreadPool

public static void testCachedThreadPool() {  
     System.out.println("Main: Starting at: "+ new Date()); 
     //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE   
     ExecutorService exec = Executors.newCachedThreadPool();  

     for(int i = 0; i < 10; i++) {     
            exec.execute(new Handle(String.valueOf(i)));     
     } 
     //执行到此处并不会马上关闭线程池,但之后不能再往线程池中加线程,否则会报错      
     exec.shutdown();  

     System.out.println("Main: Finished all threads at"+ new Date());  
}  

运行效果: 

从上面的结果可以看出: 
1、主线程的执行与线程池里的线程分开,有可能主线程结束了,但是线程池还在运行 
2、放入线程池的线程并不一定会按其放入的先后而顺序执行


FixedThreadPool 实例

FixedThreadPool模式会使用一个优先固定数目的线程来处理若干数目的任务。

规定数目的线程处理所有任务,一旦有线程处理完了任务就会被用来处理新的任务(如果有的话)。

这种模式与上面的CachedThreadPool是不同的,CachedThreadPool模式下处理一定数量的任务的线程数目是不确定的。而FixedThreadPool模式下最多 的线程数目是一定的。

应用实例:

public static void testFixThreadPool() {  
    System.out.println("Main Thread: Starting at: "+ new Date());    
     ExecutorService exec = Executors.newFixedThreadPool(5);     
     for(int i = 0; i < 10; i++) {     
            exec.execute(new Handle(String.valueOf(i)));     
     }     
     exec.shutdown();  //执行到此处并不会马上关闭线程池  
     System.out.println("Main Thread: Finished at:"+ new Date());  
}  

运行效果 
java并发-线程池总结

上面创建了一个固定大小的线程池,大小为5.也就说同一时刻最多只有5个线程能运行。并且线程执行完成后就从线程池中移出。它也不能保证放入的线程能按顺序执行。这要看在等待运行的线程的竞争状态了。


newSingleThreadExecutor 实例

创建只能运行一条线程的线程池。它能保证线程的先后顺序执行,并且能保证一条线程执行完成后才开启另一条新的线程

public static void testSingleThreadPool() {  
     System.out.println("Main Thread: Starting at: "+ new Date());    
     ExecutorService exec = Executors.newSingleThreadExecutor();   //创建大小为1的固定线程池  
     for(int i = 0; i < 10; i++) {     
            exec.execute(new Handle(String.valueOf(i)));     
     }     
     exec.shutdown();  //执行到此处并不会马上关闭线程池  
     System.out.println("Main Thread: Finished at:"+ new Date());  
}  

运行效果: 
java并发-线程池总结

等价于:

ExecutorService exec = Executors.newFixedThreadPool(1);     

 
   
   
 


newScheduledThreadPool 实例

这是一个计划线程池类,它能设置线程执行的先后间隔及执行时间等,功能比上面的三个强大了一些。


实现每个放入的线程延迟10秒执行。

public static void testScheduledThreadPool() {  
    System.out.println("Main Thread: Starting at: "+ new Date());    
    ScheduledThreadPoolExecutor  exec = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(10);   //创建大小为10的线程池  
     for(int i = 0; i < 10; i++) {     
            exec.schedule(new Handle(String.valueOf(i)), 10, TimeUnit.SECONDS);//延迟10秒执行  
     }     
     exec.shutdown();  //执行到此处并不会马上关闭线程池  
     while(!exec.isTerminated()){  
            //wait for all tasks to finish  
     }  
     System.out.println("Main Thread: Finished at:"+ new Date());  
}  

运行效果: 
java并发-线程池总结


ScheduledThreadPoolExecutor的定时方法主要有以下四种: 
java并发-线程池总结

下面将主要来具体讲讲scheduleAtFixedRate和scheduleWithFixedDelay

scheduleAtFixedRate 按指定频率周期执行某个任务

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, 
long initialDelay, 
long period, 
TimeUnit unit); 

//command:执行线程
//initialDelay:初始化延时
//period:两次开始执行最小间隔时间
//unit:计时单位


scheduleWithFixedDelay 周期定时执行某个任务/按指定频率间隔执行某个任务(注意)

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, 
long initialDelay, 
long delay, 
TimeUnit unit); 

//command:执行线程
//initialDelay:初始化延时
//period:前一次执行结束到下一次执行开始的间隔时间(间隔执行延迟时间)
//unit:计时单位


实例

class MyHandle implements Runnable {  

    @Override  
    public void run() {  
        System.out.println(System.currentTimeMillis());  
        try {  
            Thread.sleep(1 * 1000);  
        } catch (InterruptedException e) {  
            // TODO Auto-generated catch block  
            e.printStackTrace();  
        }  
    }  

}  

按指定频率周期执行某个任务 
下面实现每隔2秒执行一次,注意,如果上次的线程还没有执行完成,那么会阻塞下一个线程的执行。

即使线程池设置得足够大。

/** * 初始化延迟0ms开始执行,每隔2000ms重新执行一次任务 */  public static void executeFixedRate() {    
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);    
    executor.scheduleAtFixedRate(    
            new MyHandle(),    
            0,    
            2000,    
            TimeUnit.MILLISECONDS);    
}    

间隔指的是连续两次任务开始执行的间隔。对于scheduleAtFixedRate方法,当执行任务的时间大于我们指定的间隔时间时,它并不会在指定间隔时开辟一个新的线程并发执行这个任务。而是等待该线程执行完毕。


按指定频率间隔执行某个任务

/**   * 以固定延迟时间进行执行   * 本次任务执行完成后,需要延迟设定的延迟时间,才会执行新的任务   */    
public static void executeFixedDelay() {        ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);        executor.scheduleWithFixedDelay(                new MyHandle(),                0,                2000,                TimeUnit.MILLISECONDS);     }  

间隔指的是连续上次执行完成和下次开始执行之间的间隔。


周期定时执行某个任务

周期性的执行一个任务,可以使用下面方法设定每天在固定时间执行一次任务。

/**   * 每天晚上9点执行一次   * 每天定时安排任务进行执行   */    
public static void executeEightAtNightPerDay() {        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);    long oneDay = 24 * 60 * 60 * 1000;        long initDelay  = getTimeMillis("21:00:00") - System.currentTimeMillis();        initDelay = initDelay > 0 ? initDelay : oneDay + initDelay;        executor.scheduleAtFixedRate(                new MyHandle(),                initDelay,                oneDay,                TimeUnit.MILLISECONDS);     }    
/**   * 获取指定时间对应的毫秒数   * @param time "HH:mm:ss"   * @return   */    
private static long getTimeMillis(String time) {        try {            DateFormat dateFormat = new SimpleDateFormat("yy-MM-dd HH:mm:ss");            DateFormat dayFormat = new SimpleDateFormat("yy-MM-dd");            Date curDate = dateFormat.parse(dayFormat.format(new Date()) + " " + time);            return curDate.getTime();        } catch (ParseException e) {            e.printStackTrace();        }        return 0;     }  

以上是关于java并发-线程池总结的主要内容,如果未能解决你的问题,请参考以下文章

Java 并发编程 常见面试总结

Java 并发编程 常见面试总结

Java多线程-线程池的使用与线程总结(狂神说含代码)

Java 并发常见面试题总结(下)

Java并发编程(十八):ThreadPoolExecutor总结与源码深度分析

Java并发编程(十八):ThreadPoolExecutor总结与源码深度分析