JUC并发编程 共享模式之工具 ThreadPoolExecutor 多线程设计模式 -- 异步模式之工作线程(定义饥饿 & 解决饥饿 & 线程池创建多少线程数目合适)
Posted Z && Y
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC并发编程 共享模式之工具 ThreadPoolExecutor 多线程设计模式 -- 异步模式之工作线程(定义饥饿 & 解决饥饿 & 线程池创建多少线程数目合适)相关的知识,希望对你有一定的参考价值。
1. 工作线程
1.1 定义
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现就是线程池,也体现了经典设计模式中的享元模式
例如:
海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message(为每一个任务新分配一个线程,由这个线程来执行处理。))
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率
例如:
如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工。
1.2 饥饿(线程的数量不足)
1.2.1 概述
固定大小线程池会有饥饿现象
- 两个工人是同一个线程池中的两个线程
- 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作。
- 客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待
- 后厨做菜:没啥说的,做就是了
例如:
比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好。但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,这时没人做饭了,这种现象称为饥饿(线程的数量不足)。
1.2.2 示例代码:
1.2.2.1 正常情况(线程数量够用)
package com.tian;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j(topic = "c.TestDeadLock")
public class TestStarvation {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
// 做饭的方法
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
// 工作人员线程池
ExecutorService workerPool = Executors.newFixedThreadPool(2);
// 2个客人 线程够用且配合良好
workerPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = workerPool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
运行结果:
1.2.2.2 饥饿情况(线程数量不够用)
package com.tian;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j(topic = "c.TestDeadLock")
public class TestStarvation {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
// 做饭的方法
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
// 工作人员线程池
ExecutorService workerPool = Executors.newFixedThreadPool(2);
// 4个客人, 现在员工不够用 不会发生饥饿现象
workerPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = workerPool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
workerPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = workerPool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
运行结果:
补充: 这个现象不是死锁,而是饥饿
没有检测到死锁
1.2.2.3 解决饥饿
不同的任务类型,采用不同的线程池,例如:
package com.tian;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j(topic = "c.TestDeadLock")
public class TestStarvation {
static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");
static Random RANDOM = new Random();
static String cooking() {
return MENU.get(RANDOM.nextInt(MENU.size()));
}
public static void main(String[] args) {
// 服务员线程池
ExecutorService waiterPool = Executors.newFixedThreadPool(1);
// 厨师线程池
ExecutorService cookPool = Executors.newFixedThreadPool(1);
waiterPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
waiterPool.execute(() -> {
log.debug("处理点餐...");
Future<String> f = cookPool.submit(() -> {
log.debug("做菜");
return cooking();
});
try {
log.debug("上菜: {}", f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}
运行结果:
1.3 线程池创建多少线程数目合适
- 过小会导致程序不能充分地利用系统资源、容易导致饥饿
- 过大会导致更多的线程上下文切换,占用更多内存
1.3.1 CPU 密集型运算(大量的运算,例如数据分析)
通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
1.3.2 I/O 密集型运算
CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
经验公式如下:
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
示例:
以上是关于JUC并发编程 共享模式之工具 ThreadPoolExecutor 多线程设计模式 -- 异步模式之工作线程(定义饥饿 & 解决饥饿 & 线程池创建多少线程数目合适)的主要内容,如果未能解决你的问题,请参考以下文章
JUC并发编程 共享模式之工具 JUC Semaphore(信号量) -- 介绍 & 使用
JUC并发编程 共享模式之工具 ThreadPoolExecutor -- 正确处理线程池异常
JUC并发编程 共享模式之工具 JUC ConcurrentHashMap -- ConcurrentHashMap的错误使用和正确使用(示例:统计单词个数)
JUC并发编程 共享模式之工具 JUC Semaphore(信号量) -- Semaphore原理
JUC并发编程 共享模式之工具 JUC 线程安全的集合类 -- 线程安全的集合类概述
JUC并发编程 共享模式之工具 ThreadPoolExecutor -- 线程池应用之定时任务(在每周周四执行定时任务)