线程池(ExecutorsFutureCountDownLatchCyclicBarrier)

Posted 天河一粟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池(ExecutorsFutureCountDownLatchCyclicBarrier)相关的知识,希望对你有一定的参考价值。

线程池

Executors、Future、CountDownLatch、CyclicBarrier

一、JDK创建

JDK5中提供的Executors工具类可以通过4个静态方法创建4种线程池

  1. 创建固定大小的线程池
    // 弊端:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求,从而导致OOM
    final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
    
  2. 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
    // 弊端:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求,从而导致OOM
    final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    
  3. 创建一个可缓存(无限大)的线程池,适合处理执行时间短的任务
    // 弊端:允许创建的线程数为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM
    final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    
  4. 创建可延时或定时执行的线程池
    // 弊端:允许创建的线程数为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM
    final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
    
  5. 推荐下面方式
    /**
     * - 这种情况下,一旦提交的线程数超过当前可用线程数时,就会抛出java.util.concurrent.RejectedExecutionException,
     * - 这是因为当前线程池使用的队列是有边界队列,队列已经满了便无法继续处理新的请求。但是异常(Exception)总比发生错误(Error)要好。
     * corePoolSize–池中要保留的线程数,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut
     * maximumPoolSize–池中允许的最大线程数,注意如果使用无界的阻塞队列,该参数没有什么效果。
     * keepAliveTime–当线程数大于核心时,这是多余空闲线程在终止前等待新任务的最长时间。
     * unit–keepAliveTime参数的时间单位,可选单位有DAYS、HOURS、MINUTES、毫秒、微秒、纳秒。
     * workQueue–用于在执行任务之前保留任务的队列。此队列将仅包含execute方法提交的Runnable任务。
     *      ArrayBlockingQueue: 基于数组结构的有界阻塞队列,按FIFO(先进先出)原则对任务进行排序。使用该队列,线程池中能创建的最大线程数为maximumPoolSize。
     *      LinkedBlockingQueue: 基于链表结构的无界阻塞队列,按FIFO(先进先出)原则对任务进行排序,吞吐量高于ArrayBlockingQueue。使用该队列,线程池中能创建的最大线程数为corePoolSize。静态工厂方法Executor.newFixedThreadPool()使用了这个队列。
     *      SynchronousQueue: 一个不存储元素的阻塞队列。添加任务的操作必须等到另一个线程的移除操作,否则添加操作一直处于阻塞状态。静态工厂方法Executor.newCachedThreadPool()使用了这个队列。
     *      PriorityBlokingQueue: 一个支持优先级的无界阻塞队列。使用该队列,线程池中能创建的最大线程数为corePoolSize。
     * threadFactory–执行器创建新线程时使用的工厂
     * handler–由于达到线程边界和队列容量而阻止执行时要使用的处理程序
     *      AbortPolicy: 无法处理新任务时,直接抛出异常,这是默认策略。
     *      CallerRunsPolicy:用调用者所在的线程来执行任务。
     *      DiscardOldestPolicy:丢弃阻塞队列中最靠前的一个任务,并执行当前任务。
     *      DiscardPolicy: 直接丢弃任务。
     */
     private static ThreadFactory threadFactory = new ThreadFactoryBuilder()
             .setNameFormat("demo-pool-%d").build();
     private static ExecutorService pool = new ThreadPoolExecutor(5, 200,
             60L, TimeUnit.MILLISECONDS,
             new ArrayBlockingQueue<Runnable>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());
    

二、Future的get()

Future接收线程执行结果;get()具有阻塞作用,等待执行结束

 public void m1() throws Exception 
     // 1、用于封装所有启动的异步线程
     List<Future<Object>> futureList = new ArrayList<>();

     // 2、
     IntStream.range(0, 5).forEach(v -> 
         final Future<Object> future = pool.submit(() -> 
             Thread.sleep((long) (Math.random()*5000));
             System.out.println(Thread.currentThread().getName());
             return true;
         );
         futureList.add(future);
     );

     // 4、使用Future的get()方法确定所有线程都执行完了
     for (Future<Object> future : futureList) 
         System.out.println(future.get()); // Future的get方法具有阻塞作用
     

     // 5、TODO 其它业务逻辑

     System.out.println(LocalDateTime.now().format(formatter) + "==>结束");
 

三、CountDownLatch减数计数器

CountDownLatch是一个倒数的计数器阀门,初始化时阀门关闭,指定计数的数量,当数量倒数减到0时阀门打开,被阻塞线程被唤醒。

 public void m2() throws Exception 
     System.out.println(LocalDateTime.now().format(formatter) + "==>开始");

     // 1、创建长度为10的计数器
     CountDownLatch latch = new CountDownLatch(10);

     // 2、启动10个线程
     IntStream.range(0, 10).forEach(v -> 
         pool.execute(() -> 
             try 
                 Thread.sleep((long) (Math.random()*5000));
              catch (InterruptedException e) 
                 e.printStackTrace();
             
             System.out.printf("%s %s%n", LocalDateTime.now().format(formatter), Thread.currentThread().getName());
             // 减少锁存器的计数,如果计数为零,则释放所有等待的线程。
             latch.countDown();
         );
     );

     // 3、导致当前线程等待,直到锁存器倒计时到零,除非线程被中断
     latch.await();

     // 4、TODO 其它业务逻辑

     System.out.println(LocalDateTime.now().format(formatter) + "==>结束");
 

四、CyclicBarrier循环栅栏

CyclicBarrier是一个可循环的屏障,它允许多个线程在执行完相应的操作后彼此等待共同到达一个point,等所有线程都到达后再继续执行(例如王者,需要等待所有人准备好才能开始)。
CyclicBarrier也可以像CountDownLatch一样适用于多个子任务并发执行,当所有子任务都执行完后再继续接下来的工作。

 public void m4() throws Exception 
 	System.out.println(LocalDateTime.now().format(formatter) + "==>开始");
 
 	final AbstractQueue<String> queue = new ArrayBlockingQueue<>(5);
 	queue.add("亚瑟");
 	queue.add("张飞");
 	queue.add("关羽");
 	queue.add("刘备");
 	queue.add("赵云");
 
 	// 1、创建长度为10的计数器
 	CyclicBarrier cyclic = new CyclicBarrier(5);
 
 	// 2、启动5个线程
 	IntStream.range(0, 5).forEach(v -> 
 		pool.execute(() -> 
 			String msg = String.format(" %s %s", Thread.currentThread().getName(), queue.poll());
 			try 
 				System.out.println(LocalDateTime.now().format(formatter) + msg + "进入游戏...");
 				Thread.sleep((long) (Math.random()*5000));
 				System.out.println(LocalDateTime.now().format(formatter) + msg + "准备就绪");
 				cyclic.await();
 			catch (Exception e) 
 				e.printStackTrace();
 			
 			System.out.println(LocalDateTime.now().format(formatter) + msg + "开始游戏...");
 			try 
 				Thread.sleep((long) (Math.random()*5000));
 				System.out.println(LocalDateTime.now().format(formatter) + msg + "结算完成");
 				cyclic.await();
 			catch (Exception e) 
 				e.printStackTrace();
 			
 			System.out.println(LocalDateTime.now().format(formatter) + msg + "退出游戏...");
 		);
 	);
 
 	// 4、TODO 其它业务逻辑
 
 	Thread.sleep(10000L);
 	System.out.println(LocalDateTime.now().format(formatter) + "==>结束");
 

五、CyclicBarrier和CountDownLatch区别

  • CountDownLatch的await()线程会等待计数器减为0,而执行CyclicBarrier的await()方法会使线程进入阻塞等待其他线程到达障点。
  • CountDownLatch计数器不能重置,CyclicBarrier可以重置循环利用。
  • CountDownLatch是基于AQS的共享模式实现的,CyclicBarrier是基于ReentrantLock和Condition实现的。
  • CountDownLatch不会让子线程进入阻塞,CyclicBarrier会使所有子线程进入阻塞。

六、自定义线程池

  • 核心流程:
  1. 线程池中有N个工作线程
  2. 把任务提交给线程池运行
  3. 如果线程池已满,把任务放入队列
  4. 当有空闲线程时,获取队列中任务来执行
  5. 如果线程池和队列都满了,需要考虑拒绝策略
    • AbortPolicy :直接抛出异常,默认使用此策略
    • CallerRunsPolicy:用调用者所在的线程来执行任务
    • DiscardOldestPolicy:丢弃阻塞队列里最老的任务,也就是队列里靠前的任务
    • DiscardPolicy :当前任务直接丢弃

以上是关于线程池(ExecutorsFutureCountDownLatchCyclicBarrier)的主要内容,如果未能解决你的问题,请参考以下文章

什么叫线程池?线程池如何使用?

多线程(六):线程池

多线程(六):线程池

十五、线程池(六)自动创建线程池的弊端

java 如何获得线程池中正在执行的线程数?

线程池参数与线程池调优