Callable,阻塞队列,线程池问题

Posted luhuajun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Callable,阻塞队列,线程池问题相关的知识,希望对你有一定的参考价值。

一.说说Java创建多线程的方法

  1. 通过继承Thread类实现run方法

     2. 通过实现Runnable接口

  3. 通过实现Callable接口

  4. 通过线程池获取

 

二. 可以写一个Callable的案例吗?如何调用Callable接口

/*是一个带返回值的多线程类,如果需要有线程返回的结果,就需要使用此类*/
class MyThread implements Callable<Integer> {

    @Override
    public Integer call()  {
        return 1000;
    }
}
public static void main(String[] args) throws Exception{

        /*Thread 构造方法传入 Runnable , FutureTask 构造方法传入 Callable , 
        FutureTask 继承于 RunnableFuture 继承于 Runnable
        所以 Thread通过传入FutureTask即Runnable的实现类来启动Callable线程,
        透传思想,传接口,不传具体的实现类,可以保证灵活性,(适配器模式)
       */
        FutureTask<Integer> ft = new FutureTask(new MyThread());// 一个是main线程,一个是t1线程
        new Thread(ft,"t1").start();


        int res1 = 1000;
        // 获取t1线程中的返回值,建议放在最后
        // 如果没有计算完成就取值,会导致整个程序阻塞,直到t1执行完成
        int res2 = ft.get();
        //自旋锁,如果t1线程没有计算完成,就等待
        while (!ft.isDone()){

        }
        System.out.println(Thread.currentThread().getName() + "	"+(res1+res2));
    }

三. 请你谈谈对阻塞队列的理解,为什么要是用阻塞队列,它有哪些具体的实现,各有什么特点?

在多线程的环境下,所谓阻塞,在某些情况下会挂起线程,一旦条件满足,被挂起的线程又被唤醒.我们不需要关心

什么时候需要阻塞线程,什么时候需要唤醒线程,一切都有BlockingQueue自动调度实现

 它有7个实现类
* 1.ArrayBlockingQueue 由数组构成的有序阻塞队列
* 2.LinkedBlockingQueue 由链表构成的有序阻塞队列
* 3.PriorityBlockingQueue 支持优先级排序的无界阻塞队列
* 4.DelayQueue 使用优先级队列的延迟无阻塞队列
* 5.SynchronousQueue 不存储元素的阻塞队列,也即单元素队列
* 6.LinkedTransferQueue 由链表组成的无界阻塞队列
* 7.LinkedBlockingQueue 由链表构成的双向阻塞队列

1,2,5用的最多

BlockingQueue的使用方法

1.会抛出异常

public static void show01(int n){
        /*队列中的数据错误,会抛出异常*/
        /*add     队列满了,再继续插入会抛出java.lang.IllegalStateException: Queue full异常*/
        /*remove  从队列中移除一个元素,并返回该元素,如果队列为空,就抛出java.util.NoSuchElementException异常*/
        /*element 返回队列的首个元素,如果队列为空,抛出java.util.NoSuchElementException异常*/
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(n);
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
//        System.out.println(blockingQueue.add("x"));
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        //System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.element());//
    }

2.不会抛出异常

public static void show02(int n){
        /*队列数据错误,不会抛出异常*/
        /*offer 添加元素,如果队列满了,再继续插入元素,会返回false*/
        /*poll 获取元素,如果队列为空,再继续获取元素,会返回null*/
        /*peek 获取队列的首元素,如果队列为空,返回null*/
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(n);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
//        System.out.println(blockingQueue.offer("x"));
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
//        System.out.println(blockingQueue.poll());

        System.out.println(blockingQueue.peek());
    }

3.队列阻塞

public static void show03(int n){
        /*队列数据异常,不会抛出异常,没有返回值*/
        /*put 如果队列已满,在插入元素,队列会阻塞*/
        /*take 如果队列为空,在获取元素,队列会阻塞*/
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(n);
        try {
            blockingQueue.put("a");
            blockingQueue.put("b");
            blockingQueue.put("c");
          // blockingQueue.put("x");
            System.out.println("-------------------");
            blockingQueue.take();
            blockingQueue.take();
            blockingQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

4.队列超时 (实际用的较多)

public static void show04(int n){
        /*队列数据异常,不会抛出异常,返回true/false值*/
        /*offer 传入值,等待时间,等待单位,往队列中存放元素,当队列中的元素已满,继续等待,如果超过给定时间,那么就会丢弃元素*/
        /*poll 等待时间,等待单位,从队列中获取元素,当队列中的元素为空,继续等待,如果超过给定时间,那么就不会获取*/
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(n);
        try {
            System.out.println(blockingQueue.offer("a", 2L, TimeUnit.SECONDS));
            System.out.println(blockingQueue.offer("b", 2L, TimeUnit.SECONDS));
            System.out.println(blockingQueue.offer("c", 2L, TimeUnit.SECONDS));
            System.out.println(blockingQueue.offer("x", 2L, TimeUnit.SECONDS));

            blockingQueue.poll(2L,TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
 public static void main(String[] args) {
        show01(3); //    异常
        show02(3); //   没有异常
        show03(3); //   阻塞
        show04(3); // 超时等待
    }

四.请你谈谈对线程池的理解,为什么要使用线程池,如何使用线程池?

介绍:线程池的主要是控制运行的线程的数量,如果处理过程中将任务加入队列,然后在线程创建的时候启动这些任务.如果线程数量超过了最大数量的线程将排队等候,等待其他的线程执行完毕,再从队列中取出任务来执行.

优点:

  1.降低资源消耗,通过复用已创建的线程降低线程创建和销毁造成的消耗

  2.提高响应速度,当任务达到时,任务可以不需要等待线程创建就可以立刻执行

  3.提高线程的可管理性,线程属于稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性

     使用线程池可以进行统一的分配,调优和监控

一句话总结:线程复用,控制最大并发数,管理线程

 public static void show01(ExecutorService es){
        // 创建1个线程池,里面有5个固定的线程
        /*1. 创建一个定长的线程池,可控制线程的最大并发数,超出的线程会在队列中等待
        * 2. newFixedThreadPool创建的线程池corePoolSize和maxPoolSize的值事相等的,使用的是LinkedBlockingQueue
        * */

        es = Executors.newFixedThreadPool(5);

        try{
            // 有10个任务等待被执行
            for (int i = 1; i <= 10; i++) {
                es.execute(()->{
                    System.out.println(Thread.currentThread().getName() +"	 执行任务");
                });
            }
        }catch (Exception e){
            e.getStackTrace();
        }finally {
            es.shutdown();//关闭线程池
        }
    }
 public static void show02(ExecutorService es){
        // 创建1个线程池,里面有1个固定的线程
        /*
        * 1.创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有的任务按照指定的顺序执行
        * 2.newSingleThreadExecutor将corePoolSize和maxPoolSize的值都设置为1,使用的是LinkedBlockingQueue
        * */
        es = Executors.newSingleThreadExecutor();

        try{
            for (int i = 1; i <= 10; i++) {
                es.execute(()->{
                    System.out.println(Thread.currentThread().getName() +"	 执行任务");
                });
            }
        }catch (Exception e){
            e.getStackTrace();
        }finally {
            es.shutdown();//关闭线程池
        }
    }
public static void show03(ExecutorService es){
        // 创建1个线程池,里面无固定个线程
        /*
        * 1.创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则创建新线程
        * 2.newCachedThreadPool将corePoolSize的值设置为0,maxPoolSize的值设置为Integer.MAX_VALUE,使用
        *  的是synchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程
        *
        * */
        es = Executors.newCachedThreadPool();
        try{
            for (int i = 1; i <= 10; i++) {
                es.execute(()->{
                    System.out.println(Thread.currentThread().getName() +"	 执行任务");
                });

            }
        }catch (Exception e){
            e.getStackTrace();
        }finally {
            es.shutdown();//关闭线程池
        }
    }
public static void main(String[] args) {
        ExecutorService es = null;
        show01(es);
        show02(es);
        show03(es);
    }

 五. 请你谈一谈线程池的七大/五大参数分别代表什么意思

int corePoolSize                      线程池中常驻核心线程数
    * 1.在创建了线程池后,当有请求任务来了之后,就会安排线程池中的线程去执行请求的任务,类似于银行当值的窗口
    * 2.当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
int maximumPoolSize                   线程池同时能够容纳同时执行的最大线程数,此值必须大于等于1
long keepAliveTime                    多余的空闲线程的存活时间
      * 当前线程池数量超过corePoolSize时,空闲时间达到keepAliveTime值时,多余线程会被销毁直到只剩下corePoolSize个线程为止
      * 默认情况下,只有当线程池中的线程数大于corePoolSize时候keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize
      *
TimeUnit unit                         keepAliveTime的单位
BlockingQueue<Runnable> workQueue     任务队列,被提交但尚未被执行的任务
ThreadFactory threadFactory           表示生成线程池中工作线程的线程工厂,用于创建线程,一般用默认的值即可
RejectedExecutionHandler handler      拒绝策略,表示当队列满了并且工作线程大于等于线程池的最大线程数(maximumPoolSize)如何来拒绝请求执行的Runnable策略

六.请你谈一谈线程池的底层工作原理

技术图片

  1.在创建了线程池后,等待提交过来的任务请求

  2.当调用execute方法添加一个任务请求时,线程池会做出如下判断:

    2.1 如果正在运行的线程数量小于corePoolSize,那么会马上创建这个线程并执行这个任务

    2.2 如果此时正在运行的线程数量大于或等于corePoolSize,那么会将这个这个任务放进阻塞队列中等待

    2.3 如果此时阻塞队列满了且正在运行的线程数量小于maximumPoolSize,那么会创建非核心线程来立即执行这些任务

    2.4 如果此时阻塞队列满了且正在运行的线程数大于或等于maximumPoolSize.那么线程池会启用饱和拒绝策略来拒绝新的请求

  3.当一个线程完成任务时,会从阻塞队列中取出下一个任务来执行

  4.此时当一个线程空闲并超过一定的时间keepAliveTime,此时线程池会判断,如果当前运行的线程数量大于corePoolSize,那么这些线程将会被终止.

     所以线程池的所有任务完成后,最终会收缩到corePoolSize的大小.

 举列子:银行办理业务,周六招商银行,一共有5个柜面,只开放了2个柜台办理业务,此时2就代表corePoolSize,5个柜台代表了maximumPoolSize,只有2个员工在干活,

    其他的3个柜台都是空闲的当有人来进去办业务,如果来的人少于2人可以立即办理业务,无需等待,当来了5个人办理业务,前面的1,2号没有办完,此时3,4,5号就

    会进入候客区等待.也就是线程池中的阻塞队列中等待被执行.如果此时来银行办理业务的任实在太多,来了7,8,9,候客区也满了(BlockingQueue满了),此时银行

    行长就会开放剩余的柜台来处理业务(非核心线程打开,此时线程数达到最大值),打电话给另外3个柜员来加班,此时柜台全部开放,也满足不了新的客户需求,那么

    最后银行行长就会暂时关闭这个网点(拒绝策略拒绝新的请求),先来办理行内的业务.当解决了一个任务时,就会叫下一个号来办理业务(从阻塞队列中获取待

    解决的任务).如果过了一个小时(keepAliveTime保留活跃时间),银行的流量下降了,此时,加班的3个人可以先回去(本来也不该他们上班),银行重新回归到初始状

    态.(这么说,小伙伴们懂了吗~)

七. 请你谈一谈线程池的拒绝策略

  此时线程池中的阻塞队列已经被塞满,再也放不进新的任务,同时线程数也达到了maximumPoolSize,无法为新的请求服务,此时JDK有4种拒绝策略

1.AbortPolicy(默认)    直接抛出RejectedExecutionException异常阻止系统正常运行
2.CallersRunsPolicy    "调用者模式"一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量
3.DiscardOldestPolicy  抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务
4.DiscardPolicy        直接丢弃任务,不予处理也不抛出异常.如果允许任务丢失,这是最好的一种方案

  那此时可以说下在工作中用了哪一种线程池?(此处有坑)

  JDK提供的线程池一律不能说,标准答案: 根据业务的需要,自定义ThreadPoolExectuor来创建线程池.

public static void show04(ExecutorService es){
       // 核心线程数为2,最大线程数为5,非核心线程的最大运行时间是1秒
       // 阻塞队列中一共有3个等待的任务,使用默认的线程生成策略,同时开启
       // 调用者模式的拒绝策略
       es = new ThreadPoolExecutor(
                2,5,1L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy());
        try{
            for (int i = 1; i <= 10; i++) { // 开启10个请求,最大线程数量是8,
                es.execute(()->{
                    System.out.println(Thread.currentThread().getName()+"	办理业务");
                });
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            es.shutdown();
        }
    }

运行结果:

AbortPolicy,直接抛出异常

技术图片

 

 

CallersRunsPolicy 不会抛出异常,返回给方法的调用者,也就是main线程

技术图片

 

 

DiscardOldestPolicy  抛弃队列中等待最久的任务

技术图片

 

 

DiscardPolicy 直接抛弃队列中多余的任务,如果业务允许,此拒绝策略效率最高

技术图片

 

 

八. 请你谈一谈,如何在实际的生产业务中确定线程池中参数的配置

此时要根据实际做的系统进行分类

  1.CPU密集型

/*指的是该任务需要大量的运算,没有阻塞,CPU一直全速运行,CPU密集任务只有真正的多核心CPU才能得到加速(通过多线程)而在单核CPU上
无论开几个模拟多线程的任务都不可能得到加速,因为CPU的运算能力有限. 1.CPU密集型的任务尽可能少配线程数量 2.公式:线程数 = CPU核心数 + 1个线程的线程池(8核就开启8个线程,12核开启12个线程,尽量减少切换) 3.System.out.println(Runtime.getRuntime().availableProcessors());//查看CPU的核心数
*/

  2.I/O密集型

/* 1.I/O密集型并不是在一直执行任务,应该配置尽可能多的线程,线程数 = CPU核心数 * 2
   2.I/O密集型即该任务需要大量的I/O操作,业务系统中大量的阻塞(从数据库,缓存获取数据,修改数据...)
      所以在I/O密集型的场景中,使用多线程可以大大加速程序运行,即使在单核CPU上,这种加速也可以利用了被浪费掉的阻塞时间
      因为大部分线程都会阻塞,需要配置大量的线程数 
      线程数 = CPU核心数 / 1-阻塞系数 阻塞系数在0.8~0.9之间
      (例如 8核心CPU 8 / 1 - 0.9 = 80,如果是I/O密集型的操作,需要配置80个线程数) 
*/

 

以上是关于Callable,阻塞队列,线程池问题的主要内容,如果未能解决你的问题,请参考以下文章

java:并发编程-Callable与Future模式

阻塞队列和线程池

阻塞队列和线程池原理

阻塞队列和线程池原理

4种线程池和7种并发队列

Java线程池(Callable+Future模式)