Java中的并发编程Executor

Posted 攻城狮Chova

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java中的并发编程Executor相关的知识,希望对你有一定的参考价值。

基本概念

  • Thread存在的问题:
    • 性能很差,每次调用new Thread() 时都会创建新的对象
    • 线程缺乏统一的管理. 可能会无限的新建线程,相互之间竞争,可能会导致占用过多的系统资源而发生死机或者内存溢出问题
    • 缺乏对线程的功能控制. 比如定时执行,定期执行,线程中断等
  • Executor框架:
    • Java 1.5之后引入的,位于java.util.concurrent
    • 内部使用了线程池的机制,可以控制线程的启动,执行和关闭,简化并发线程的操作
  • Executor特点:
    • Executor来启动线程比Thread中的start() 方法启动线程更加易于管理,效率更高,节约开销
    • 有助于避免this逃逸问题: 如果在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能访问到初始化一半的对象,导致引用不完整问题
    • 性能高. 可以重用已经存在的线程,减少对象的创建开销
    • 可以有效控制最大并发线程数,提高系统资源利用率的同时,也能避免过多的资源竞争,造成阻塞
    • 提供线程的定时执行,定期执行,单线程,并发数控制等功能
  • Executor作为异步执行框架,支持多种不同类型的任务执行策略,提供了一种标准方法将任务的提交过程和执行过程进行解耦:
    • 基于生产者-消费者模式,使用Runnable来表示任务,提交任务的线程相当于生产者,执行任务的线程相当于消费者
  • Executor中实现了对生命周期的支持,统计信息收集,应用程序管理机制和性能监视机制
  • Executor的类图:
  • Executor框架包括:
    • 线程池
    • Executor
    • Executors
    • ExecutorService
    • CompletionService
    • Future
    • Callable

Executor

  • Executor接口: 定义了一个接收Runnable对象的方法execute()
     void execute(Runnable command);
    
    • 该方法接收一个Runnable实例,用来执行一个任务
    • 任务就是一个实现了Runnable接口的类
  • 一般情况下,在新线程中使用Runnable时创建一个Runnable任务如下:
new Thread(new RunnableTask()).start();

但是在Executor中,不需要显式地创建线程:

executor.execute(new RunnableTask());

ExecutorService

  • ExecutorService接口:
    • Executor的子类接口
    • 提供生命周期管理的方法,返回Future对象
    • 提供可以跟踪一个或者多个异步任务执行情况的方法,返回Future对象
  • shutdown():
    • 可以通过调用ExecutorServiceshutdown() 方法平滑的关闭ExecutorService. 调用shutdown() 方法之后 ,ExecutorService将停止接收新的任务并且等待已经提交的任务执行完成.已经提交的任务包括提交了已经在执行的任务和提交了还没有执行的任务.当所有已经提交的任务执行完成就关闭ExecutorService
    • 可以使用ExecutorService接口来实现和管理多线程
  • submit():
    • 通过ExecutorService.submit() 方法返回的Future对象,可以通过调用Future对象的isDone() 方法查询Future是否完成
    • 当任务完成后,会得到一个处理结果,可以通过调用Future对象的get() 方法获取处理结果
    • 也可以不使用Future对象的isDone() 方法进行检查,直接调用Future对象的get() 方法获取结果,此时get() 方法将会发生阻塞,直到获取到处理结果
    • 在任务执行过程中,可以使用Future对象的cancel() 方法取消正在执行中的任务

Executors

  • Executors类:
    • 主要用于提供线程池的相关操作
    • 提供了一系列的工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口
  • newFixedThreadPool:
    public static ExecutorService newFixedThreadPool(int nThreads);
    
    • 创建一个固定数目线程的线程池
    • 任意时间点,最多只能存在固定数目的活动线程. 此时如果创建了新的线程,只能在等待队列中等待线程池中的某个线程终止被移除线程池后才能进行线程池
    • 固定数目线程的线程池可以重用线程,但是不能随时新建新的线程
    • 固定数目线程的线程池没有超时的问题,通常用于稳定固定的正规的长时间的并发线程,常常用于服务器中的线程
  • newCachedThreadPool:
    public static ExecutorService newCachedThreadPool();
    
    • 创建一个可以缓存的线程池
    • 调用execute() 可以重用之前构造过的可用线程
    • 如果没有可用的线程,就会创建一个新的线程添加到线程池中
    • 缓存型线程通常用于执行一些生存周期很短的异步任务. 所以在面向连接的Daemon型的Server中不常见,但是对于生存期短的异步任务,缓存型线程是Executors的首选
    • 如果有线程超过60秒没有被使用会被终止并且从缓存中移除
    • 能够重用的线程,必须在有效期IDLE之内,默认是60
    • 缓存型线程池中的线程不用担心终止关闭的问题,因为线程超过60秒未被使用会自动终止并从缓存型线程池中移除
  • newSingleThreadExecutor:
    public static ExecutorService newSingleThreadExecutor();
    
    • 创建一个单线程的线程Executor
    • 任意时间点,单线程的线程池中只能有一个线程
  • newScheduledThreadPool:
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
    
    • 创建一个支持定时以及周期性任务执行的线程池
    • 通常情况下,可以用来替代Timer
    • 周期性任务执行的线程池可以按照schedule依次延迟执行或者周期性执行

Executor和ExecutorService和Executors的比较

Executor

  • Executor中定义了一个execute() 方法用来接收一个Runnable接口的对象
  • Executor中的execute() 方法不返回任何结果

ExecutorService

  • ExecutorService接口继承自Executor接口 ,ExecutorServiceExecutor的子接口
  • ExecutorService中定义的submit() 方法可以接收Runnable接口的对象和Callable接口的对象
  • ExecutorService接口中的submit() 方法可以通过Future对象返回运算的结果
  • ExecutorService中提供任务提交的方法外,还提供线程声明周期的控制方法等

Executors

  • Executors类中提供了工厂方法来创建不同类型的线程池:
    • newSingleThreadExecutor() 用来创建一个单线程的线程池
    • newFixedThreadPool() 用来创建一个固定线程数的线程池
    • newCachedThreadPool() 用来根据需要创建新的线程,如果已有线程是空闲的可以重用已有的线程

自定义线程池

  • ThreadPoolExecutor: 自定义线程池. 提供多个构造方法来创建线程池
    public ThreadPoolExecutor(int corePoolSize,
                                int maximumPoolSize,
                                long keepAliveTime,
                                TimeUnit unit,
                                BlockingQueue<Runnable> workQueue,
                                ThreadFactory threadFactory,
                                RejectedExecutionHandler handler);
    
    • corePoolSize: 线程池中保存的核心线程数,包括空闲线程
    • maximumPoolSize: 线程池中允许的最大线程数
    • keepAliveTime: 线程池中的空闲线程持续的最长时间
    • unit: 线程池中的空闲线程持续时间的单位
    • workQueue: 任务执行前保存任务的队列,只保存由execute() 方法提交的Runnable任务
    • threadFactory: 使用Executor创建新线程时用到的工厂方法
    • handler: 线程执行因为线程数和队列满时发生阻塞的处理方法
  • ThreadPoolExecutor中线程Runnable任务的处理顺序: 先看线程池中的线程数量是否大于corePoolSize,再看缓冲队列workQueue中是否满了,最后看线程池中的线程
    • 如果线程池中的线程数量小于corePoolSize, 即使线程池中存在空闲线程,也会创建一个新的线程来执行添加的任务
    • 如果线程池中的线程数量大于等于corePoolSize, 但是缓冲队列workQueue未满,就将添加的任务放入到workQueue中,然后按照FIFO的原则等待线程池中有线程空闲出来后将缓冲队列中的任务交由空闲线程执行
    • 如果线程池中的线程数量大于等于corePoolSize, 并且缓冲队列workQueue已满,但是线程池中的线程数量小于maximumPoolSize, 就会创建一个新的线程来执行添加的任务
    • 如果线程池中的线程数量等于了maximumPoolSize, 就根据RejectedExecutionHandler中4种方式里选择的handler执行线程任务处理
  • 线程池中的线程数量大于corePoolSize时,如果线程池中的空闲线程持续时间超过keepAliveTime, 就会从线程池中移除,这样就可以动态调整线程池中线程的数量了
  • 缓冲队列中的几种排队策略:
    • 直接提交:
      • 采用SynchronousQueue, 不保持任务而是将任务直接提交给线程处理
      • 如果线程池中的线程都在工作,不存在用于立即执行任务的线程,则试图将任务加入到缓冲队列就会失败,此时会构造一个新的线程来处理添加的任务,并将线程加入到线程池中
      • 直接提交要求是无界的,即最大线程数量maximumPoolSize的值为Integer.MAX_VALUE. 这样才能避免拒绝新提交的任务
      • newCachedThreadPool采用的就是直接提交策略
    • 无界队列:
      • 采用LinkedBlockingQueue, 理论上可以对无限多的任务进行排序
      • 使用无界队列可以在线程池中所有corePoolSize个线程都工作的情况下将任务添加到缓冲队列中
      • 这样不用创建新的线程来处理添加的任务,线程池中线程的数量不会超过corePoolSize. 此时最大线程数量maximumPoolSize的值为Integer.MAX_VALUE, 是无界的
      • 当每个任务完全独立于其余的任务,各个任务之间的执行互不影响时,适合使用无界队列
      • newFixedThreadPool采用的就是无界队列策略
    • 有界队列:
      • 采用ArrayBlockingQueue, 并制定队列的最大长度
      • 有界队列使用有限的maxmumPoolSize, 有助于防止资源耗尽
      • 有界队列较难调整和控制,需要设定合理的参数,对队列的大小和线程池最大数量的大小相互进行折衷

以上是关于Java中的并发编程Executor的主要内容,如果未能解决你的问题,请参考以下文章

Java中的并发编程Executor

Java中的并发编程Executor

转:Java并发编程之十九:并发新特性—Executor框架与线程池(含代码)

Java并发多线程编程——Executor接口

java并发编程框架 Executor ExecutorService invokeall

Java高并发编程