线程池(ExecutorsFutureCountDownLatchCyclicBarrier)
Posted 天河一粟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池(ExecutorsFutureCountDownLatchCyclicBarrier)相关的知识,希望对你有一定的参考价值。
线程池
Executors、Future、CountDownLatch、CyclicBarrier
一、JDK创建
JDK5中提供的Executors工具类可以通过4个静态方法创建4种线程池
- 创建固定大小的线程池
// 弊端:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求,从而导致OOM final ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
- 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
// 弊端:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求,从而导致OOM final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
- 创建一个可缓存(无限大)的线程池,适合处理执行时间短的任务
// 弊端:允许创建的线程数为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
- 创建可延时或定时执行的线程池
// 弊端:允许创建的线程数为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
- 推荐下面方式
/** * - 这种情况下,一旦提交的线程数超过当前可用线程数时,就会抛出java.util.concurrent.RejectedExecutionException, * - 这是因为当前线程池使用的队列是有边界队列,队列已经满了便无法继续处理新的请求。但是异常(Exception)总比发生错误(Error)要好。 * corePoolSize–池中要保留的线程数,即使它们处于空闲状态,除非设置了allowCoreThreadTimeOut * maximumPoolSize–池中允许的最大线程数,注意如果使用无界的阻塞队列,该参数没有什么效果。 * keepAliveTime–当线程数大于核心时,这是多余空闲线程在终止前等待新任务的最长时间。 * unit–keepAliveTime参数的时间单位,可选单位有DAYS、HOURS、MINUTES、毫秒、微秒、纳秒。 * workQueue–用于在执行任务之前保留任务的队列。此队列将仅包含execute方法提交的Runnable任务。 * ArrayBlockingQueue: 基于数组结构的有界阻塞队列,按FIFO(先进先出)原则对任务进行排序。使用该队列,线程池中能创建的最大线程数为maximumPoolSize。 * LinkedBlockingQueue: 基于链表结构的无界阻塞队列,按FIFO(先进先出)原则对任务进行排序,吞吐量高于ArrayBlockingQueue。使用该队列,线程池中能创建的最大线程数为corePoolSize。静态工厂方法Executor.newFixedThreadPool()使用了这个队列。 * SynchronousQueue: 一个不存储元素的阻塞队列。添加任务的操作必须等到另一个线程的移除操作,否则添加操作一直处于阻塞状态。静态工厂方法Executor.newCachedThreadPool()使用了这个队列。 * PriorityBlokingQueue: 一个支持优先级的无界阻塞队列。使用该队列,线程池中能创建的最大线程数为corePoolSize。 * threadFactory–执行器创建新线程时使用的工厂 * handler–由于达到线程边界和队列容量而阻止执行时要使用的处理程序 * AbortPolicy: 无法处理新任务时,直接抛出异常,这是默认策略。 * CallerRunsPolicy:用调用者所在的线程来执行任务。 * DiscardOldestPolicy:丢弃阻塞队列中最靠前的一个任务,并执行当前任务。 * DiscardPolicy: 直接丢弃任务。 */ private static ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(); private static ExecutorService pool = new ThreadPoolExecutor(5, 200, 60L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy());
二、Future的get()
Future接收线程执行结果;get()具有阻塞作用,等待执行结束
public void m1() throws Exception
// 1、用于封装所有启动的异步线程
List<Future<Object>> futureList = new ArrayList<>();
// 2、
IntStream.range(0, 5).forEach(v ->
final Future<Object> future = pool.submit(() ->
Thread.sleep((long) (Math.random()*5000));
System.out.println(Thread.currentThread().getName());
return true;
);
futureList.add(future);
);
// 4、使用Future的get()方法确定所有线程都执行完了
for (Future<Object> future : futureList)
System.out.println(future.get()); // Future的get方法具有阻塞作用
// 5、TODO 其它业务逻辑
System.out.println(LocalDateTime.now().format(formatter) + "==>结束");
三、CountDownLatch减数计数器
CountDownLatch是一个倒数的计数器阀门,初始化时阀门关闭,指定计数的数量,当数量倒数减到0时阀门打开,被阻塞线程被唤醒。
public void m2() throws Exception
System.out.println(LocalDateTime.now().format(formatter) + "==>开始");
// 1、创建长度为10的计数器
CountDownLatch latch = new CountDownLatch(10);
// 2、启动10个线程
IntStream.range(0, 10).forEach(v ->
pool.execute(() ->
try
Thread.sleep((long) (Math.random()*5000));
catch (InterruptedException e)
e.printStackTrace();
System.out.printf("%s %s%n", LocalDateTime.now().format(formatter), Thread.currentThread().getName());
// 减少锁存器的计数,如果计数为零,则释放所有等待的线程。
latch.countDown();
);
);
// 3、导致当前线程等待,直到锁存器倒计时到零,除非线程被中断
latch.await();
// 4、TODO 其它业务逻辑
System.out.println(LocalDateTime.now().format(formatter) + "==>结束");
四、CyclicBarrier循环栅栏
CyclicBarrier是一个可循环的屏障,它允许多个线程在执行完相应的操作后彼此等待共同到达一个point,等所有线程都到达后再继续执行(例如王者,需要等待所有人准备好才能开始)。
CyclicBarrier也可以像CountDownLatch一样适用于多个子任务并发执行,当所有子任务都执行完后再继续接下来的工作。
public void m4() throws Exception
System.out.println(LocalDateTime.now().format(formatter) + "==>开始");
final AbstractQueue<String> queue = new ArrayBlockingQueue<>(5);
queue.add("亚瑟");
queue.add("张飞");
queue.add("关羽");
queue.add("刘备");
queue.add("赵云");
// 1、创建长度为10的计数器
CyclicBarrier cyclic = new CyclicBarrier(5);
// 2、启动5个线程
IntStream.range(0, 5).forEach(v ->
pool.execute(() ->
String msg = String.format(" %s %s", Thread.currentThread().getName(), queue.poll());
try
System.out.println(LocalDateTime.now().format(formatter) + msg + "进入游戏...");
Thread.sleep((long) (Math.random()*5000));
System.out.println(LocalDateTime.now().format(formatter) + msg + "准备就绪");
cyclic.await();
catch (Exception e)
e.printStackTrace();
System.out.println(LocalDateTime.now().format(formatter) + msg + "开始游戏...");
try
Thread.sleep((long) (Math.random()*5000));
System.out.println(LocalDateTime.now().format(formatter) + msg + "结算完成");
cyclic.await();
catch (Exception e)
e.printStackTrace();
System.out.println(LocalDateTime.now().format(formatter) + msg + "退出游戏...");
);
);
// 4、TODO 其它业务逻辑
Thread.sleep(10000L);
System.out.println(LocalDateTime.now().format(formatter) + "==>结束");
五、CyclicBarrier和CountDownLatch区别
- CountDownLatch的await()线程会等待计数器减为0,而执行CyclicBarrier的await()方法会使线程进入阻塞等待其他线程到达障点。
- CountDownLatch计数器不能重置,CyclicBarrier可以重置循环利用。
- CountDownLatch是基于AQS的共享模式实现的,CyclicBarrier是基于ReentrantLock和Condition实现的。
- CountDownLatch不会让子线程进入阻塞,CyclicBarrier会使所有子线程进入阻塞。
六、自定义线程池
- 核心流程:
- 线程池中有N个工作线程
- 把任务提交给线程池运行
- 如果线程池已满,把任务放入队列
- 当有空闲线程时,获取队列中任务来执行
- 如果线程池和队列都满了,需要考虑拒绝策略
- AbortPolicy :直接抛出异常,默认使用此策略
- CallerRunsPolicy:用调用者所在的线程来执行任务
- DiscardOldestPolicy:丢弃阻塞队列里最老的任务,也就是队列里靠前的任务
- DiscardPolicy :当前任务直接丢弃
以上是关于线程池(ExecutorsFutureCountDownLatchCyclicBarrier)的主要内容,如果未能解决你的问题,请参考以下文章