线程池

Posted bulrush

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池相关的知识,希望对你有一定的参考价值。

一:推荐使用 ThreadPoolExecutor 构造函数创建线程池

在《阿里巴巴 Java 开发手册》“并发处理”这一章节,明确指出线程资源必须通过线程池提供,不允许在应用中自行显示创建线程。

为什么呢?

使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源开销,解决资源不足的问题。如果不使用线程池,有可能会造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

另外《阿里巴巴 Java 开发手册》中强制线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 构造函数的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

Executors 返回线程池对象的弊端如下:

  • FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

二:ThreadPoolExecutor 使用示例

2.1:示例代码:Runnable+ThreadPoolExecutor

 1 package ThreadPool;
 2 
 3 import java.util.Date;
 4 
 5 /**
 6  * @ProjectName: smartdata
 7  * @Package: ThreadPool
 8  * @ClassName: MyRunnable
 9  * @Author: heluwei
10  * @Description:  这是一个简单的Runnable类,需要大约5秒钟来执行其任务。
11  * @Date: 2020/3/21 20:36
12  * @Version: 1.0
13  */
14 public class MyRunnable implements Runnable {
15 
16     private String command;
17 
18     public MyRunnable(String s) {
19         this.command = s;
20     }
21 
22     @Override
23     public void run() {
24         System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());
25         processCommand();
26         System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());
27     }
28 
29     private void processCommand() {
30         try {
31             Thread.sleep(5000);
32         } catch (InterruptedException e) {
33             e.printStackTrace();
34         }
35     }
36 
37     @Override
38     public String toString() {
39         return this.command;
40     }
41 }

编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

 1 package ThreadPool;
 2 
 3 import java.util.concurrent.ArrayBlockingQueue;
 4 import java.util.concurrent.ThreadPoolExecutor;
 5 import java.util.concurrent.TimeUnit;
 6 
 7 /**
 8  * @ProjectName: smartdata
 9  * @Package: ThreadPool
10  * @ClassName: ThreadPoolExecutorDemo
11  * @Author: heluwei
12  * @Description: ThreadPoolExecutor线程池
13  * @Date: 2020/3/21 20:35
14  * @Version: 1.0
15  */
16 public class ThreadPoolExecutorDemo {
17     private static final int CORE_POOL_SIZE = 5; //核心线程数为 5
18     private static final int MAX_POOL_SIZE = 10; //最大线程数 10
19     private static final int QUEUE_CAPACITY = 100; //
20     private static final Long KEEP_ALIVE_TIME = 1L; //当线程数大于核心线程数时,多余的空闲线程存活的最长时间
21 
22     public static void main(String... args) {
23         //使用阿里巴巴推荐的创建线程池的方式
24         //通过ThreadPoolExecutor构造函数自定义参数创建
25         ThreadPoolExecutor executor = new ThreadPoolExecutor(
26                 CORE_POOL_SIZE,
27                 MAX_POOL_SIZE,
28                 KEEP_ALIVE_TIME, //当线程数大于核心线程数时,多余的空闲线程存活的最长时间
29                 TimeUnit.SECONDS, //时间单位
30                 new ArrayBlockingQueue<>(QUEUE_CAPACITY), //任务队列,用来储存等待执行任务的队列
31                 new ThreadPoolExecutor.CallerRunsPolicy()); //饱和策略,简单点说就是后面排队的线程就在那儿等着。
32                 //被拒绝的任务在主线程中运行,所以主线程就被阻塞了,别的任务只能在被拒绝的任务执行完之后才会继续被提交到线程池执行
33 
34         for (int i = 0; i < 10; i++) {
35             //创建WorkerThread对象(WorkerThread类实现了Runnable 接口) 每一个Runable都是一个任务
36             Runnable worker = new MyRunnable("" + i);
37             //执行Runnable
38             executor.execute(worker);
39         }
40         //终止线程池
41         // void shutdown() 启动一次顺序关闭,执行以前提交的任务,但不接受新任务。若已经关闭,则调用没有其他作用。
42         executor.shutdown();
43         //boolean isTerminated()
44         //若关闭后所有任务都已完成,则返回true。注意除非首先调用shutdown或shutdownNow,否则isTerminated永不为true。
45         while (!executor.isTerminated()) {
46             //System.out.println("线程池还没有完全关闭!!!");
47         }
48         System.out.println("Finished all threads");
49     }
50 }

可以看到我们上面的代码指定了:

  1. corePoolSize: 核心线程数为 5。
  2. maximumPoolSize :最大线程数 10
  3. keepAliveTime : 等待时间为 1L。
  4. unit: 等待时间的单位为 TimeUnit.SECONDS。
  5. workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;
  6. handler:饱和策略为 CallerRunsPolicy

线程池每次会同时执行 5 个任务,这 5 个任务执行完之后,剩余的 5 个任务才会被执行。

我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务之行完成后,才会之行剩下的 5 个任务。

2.2:Callable+ThreadPoolExecutor示例代码

MyCallable.java

 1 package ThreadPool;
 2 
 3 import java.util.concurrent.Callable;
 4 
 5 /**
 6  * @ProjectName: smartdata
 7  * @Package: ThreadPool
 8  * @ClassName: MyCallable
 9  * @Author: heluwei
10  * @Description: JDK1.5后才有,相当于Runnable
11  * @Date: 2020/3/21 22:11
12  * @Version: 1.0
13  */
14 public class MyCallable implements Callable<String> {
15     @Override
16     public String call() throws Exception {
17         Thread.sleep(1000);
18         //返回执行当前 Callable 的线程名字
19         return Thread.currentThread().getName();
20     }
21 }

CallableDemo.java

package ThreadPool;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

/**
 * @ProjectName: smartdata
 * @Package: ThreadPool
 * @ClassName: CallableDemo
 * @Author: heluwei
 * @Description:
 * @Date: 2020/3/22 9:11
 * @Version: 1.0
 */
public class CallableDemo {

    private static final int CORE_POOL_SIZE = 5;
    private static final int MAX_POOL_SIZE = 10;
    private static final int QUEUE_CAPACITY = 100;
    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) {
        //使用阿里巴巴推荐的创建线程池的方式
        //通过ThreadPoolExecutor构造函数自定义参数创建
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy());

        List<Future<String>> futureList = new ArrayList<>();
        Callable<String> callable = new MyCallable();
        for (int i = 0; i < 10; i++) {
            //提交任务到线程池
            Future<String> future = executor.submit(callable);
            //将返回值 future 添加到 list,我们可以通过 future 获得 执行 Callable 得到的返回值
            futureList.add(future);
        }
        for (Future<String> fut : futureList) {
            try {
                System.out.println(new Date() + "::" + fut.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        //关闭线程池
        executor.shutdown();
    }
}

三:几种对比

3.1:Runnable vs Callable

Runnable自 Java 1.0 以来一直存在,但Callable仅在 Java 1.5 中引入,目的就是为了来处理Runnable不支持的用例。Runnable 接口不会返回结果或抛出检查异常,但是**Callable 接口**可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁。

工具类 Executors 可以实现 Runnable 对象和 Callable 对象之间的相互转换。(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。

Runnable.java

@FunctionalInterface
public interface Runnable {
   /**
    * 被线程执行,没有返回值也无法抛出异常
    */
    public abstract void run();
}

 

Callable.java

@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算结果,或在无法这样做时抛出异常。
     * @return 计算得出的结果
     * @throws 如果无法计算结果,则抛出异常
     */
    V call() throws Exception;
}

 

3.2:execute() vs submit()

  1. execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;
  2. submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

3.3:shutdown()VSshutdownNow()

  • shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。
  • shutdownNow() :关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。

3.4: isTerminated() VS isShutdown()

  • isShutDown 当调用 shutdown() 方法后返回为 true。
  • isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true

以上是关于线程池的主要内容,如果未能解决你的问题,请参考以下文章

Java——线程池

Motan在服务provider端用于处理request的线程池

Java线程池详解

Java线程池详解

Java 线程池详解

线程池-实现一个取消选项