JavaFX 多线程之 ThreadPoolExecutor 线程池
Posted Calvin Chan
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JavaFX 多线程之 ThreadPoolExecutor 线程池相关的知识,希望对你有一定的参考价值。
JavaFX 多线程之 ThreadPoolExecutor 线程池
一、开发环境
IntelliJ IDEA 2020.2.3 + JDK 1.8 + JavaFX Scene Builder 2.0
二、ThreadPoolExecutor 介绍
ThreadPoolExecutor 是用来处理异步任务的一个接口,可以将其理解成为一个线程池和一个任务队列,提交到 ExecutorService 对象的任务会被放入任务队列或者直接被线程池中的线程执行。
ThreadPoolExecutor 对线程池和队列的使用方式如下:
- 从线程池中获取可用线程执行任务,如果没有可用线程则使用 ThreadFactory 创建新的线程,直到线程数达到 corePoolSize 限制
- 线程池线程数达到 corePoolSize 以后,新的任务将被放入队列,直到队列不能再容纳更多的任务
- 当队列不能再容纳更多的任务以后,会创建新的线程,直到线程数达到 maxinumPoolSize 限制
- 线程数达到 maxinumPoolSize 限制以后新任务会被拒绝执行,调用 RejectedExecutionHandler 进行处理
三、ThreadPoolExecutor 实现
1、JavaFX 界面
2、线程池实例初始化 MyThreadPoolExecutor
MyThreadPoolExecutor 是对线程池实例的初始化
import java.util.concurrent.*;
/**
* 线程池
* @author wxhntmy
*/
public class MyThreadPoolExecutor {
/**
* 核心线程池大小
*/
int corePoolSize = 2;
/**
* 最大线程池大小
*/
int maximumPoolSize = 4;
/**
* 线程最大空闲时间
*/
long keepAliveTime = 10;
/**
* 时间单位
*/
TimeUnit unit = TimeUnit.SECONDS;
/**
* 线程等待队列
*/
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
/**
* 线程创建工厂
*/
ThreadFactory threadFactory = new NameTreadFactory();
/**
* 拒绝策略
*/
RejectedExecutionHandler handler = new MyIgnorePolicy();
/**
* 构造函数:初始化线程池
* @param corePoolSize 线程池大小
* @param maximumPoolSize 最大线程池大小
* @param keepAliveTime 线程最大空闲时间
*/
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.keepAliveTime = keepAliveTime;
}
/**
* 构造函数
*/
public MyThreadPoolExecutor() {
}
/**
* 默认线程池
* @return 线程池
*/
public ExecutorService createNewThreadPool(){
return new ThreadPoolExecutor(this.corePoolSize, this.maximumPoolSize, this.keepAliveTime, this.unit,
this.workQueue, this.threadFactory, this.handler);
}
/**
* newFixedThreadPool:数量固定大小,该线程池重用在共享的无边界队列上运行的固定数量的线程。
* 在任何时候,最多 corePoolSize(构造函数的参数,核心线程数与最大线程数相等)个线程都是活动的处理任务。
* 如果在所有线程都处于活动状态时提交了其他任务,则它们将在队列中等待,直到某个线程可用为止。
* 如果在关闭之前执行过程中由于执行失败导致任何线程终止,则在执行后续任务时将使用新线程代替。
* 池中的线程将一直存在,直到明确将其关闭。
* @param corePoolSize 核心线程数
* @return 线程池
*/
public ExecutorService newFixedThreadPool(int corePoolSize){
return new ThreadPoolExecutor(corePoolSize, corePoolSize, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), this.threadFactory, this.handler);
}
/**
* newCachedThreadPool:数量无上限,该线程池会根据需要创建,但是优先使用之前构造的线程。
* 这些池通常将提高执行许多短期异步任务的程序的性能,如果没有可用的现有线程,则将创建一个新线程并将其添加到池中。
* 当60S内未使用的线程将被终止并从缓存中删除。 因此,保持空闲时间足够长的池不会消耗任何资源。
* @return 线程池
*/
public ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), this.threadFactory, this.handler);
}
/**
* newScheduledThreadPool:该线程池可以安排命令在给定的延迟后运行或定期执行。
* @param corePoolSize 线程池核心线程大小
* @return 线程池
*/
public ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize, this.threadFactory, this.handler);
}
}
3、拒绝策略 MyIgnorePolicy
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 拒绝策略
* @author wxhntmy
*/
public class MyIgnorePolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
doLog(r, e);
}
private void doLog(Runnable r, ThreadPoolExecutor e) {
// 可做日志记录等
System.err.println( r.toString() + " rejected");
}
}
4、线程创建工厂 NameTreadFactory
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线程创建工厂
* @author wxhntmy
*/
public class NameTreadFactory implements ThreadFactory {
private final AtomicInteger mThreadNum = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
System.out.println(t.getName() + " has been created");
return t;
}
}
四、Task 实现
1、创建 ThreadPoolExecutor
/**
* 线程池
*/
private MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor();
/**
* 线程池实例
*/
ExecutorService executorService = myThreadPoolExecutor.createNewThreadPool();
2、通过实现 Runnable 接口创建 Task
通过实现 Runnable 接口创建
/**
* 任务/线程
* @author wxhntmy
*/
public class MyTask implements Runnable {
/**
* 任务名称
*/
private String name;
/**
* 构造函数
* @param name 任务名称
*/
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(this.toString() + " is running!");
try {
//让任务执行慢点
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.toString() + " is end!");
}
/**
* 获取任务名称
* @return 任务名称
*/
public String getName() {
return name;
}
@Override
public String toString() {
return "MyTask [ name = " + name + " ]";
}
}
提交运行 task
MyTask myTask = new MyTask("new thread");
//execute(Runnable)
//这个方法接收一个Runnable实例,并且异步的执行
executorService.execute(myTask);
运行结果
3、通过实现 Callable 接口创建 Task
import java.util.concurrent.Callable;
/**
* 带返回结果的任务,返回一个Callable
* @author wxhntmy
*/
public class ReturnResultTaskCallable implements Callable<Object> {
/**
* 任务名称
*/
private String name;
/**
* 构造函数
* @param name 任务名称
*/
public ReturnResultTaskCallable(String name) {
this.name = name;
}
@Override
public Object call() throws Exception {
System.out.println(this.toString() + " is running!");
System.out.println("Asynchronous Callable");
try {
//让任务执行慢点
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(this.toString() + " is end!");
return "Callable Result";
}
/**
* 获取任务名称
* @return 任务名称
*/
public String getName() {
return name;
}
@Override
public String toString() {
return "MyCallable [ name = " + name + " ]";
}
}
提交运行 task
ReturnResultTaskCallable returnResultTaskCallable = new ReturnResultTaskCallable("return result callable");
//submit(Callable)会返回一个Future对象,submit(Callable)接收的是一个Callable的实现,
//Callable接口中的call()方法有一个返回值,可以返回任务的执行结果,而Runnable接口中的run()方法是void的,没有返回值。
Future<Object> future = executorService.submit(returnResultTaskCallable);
//如果任务执行完成,future.get()方法会返回Callable任务的执行结果。
//注意,future.get()方法会产生阻塞。
try {
System.out.println("future.get(): " + future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
运行结果
4、带对象锁的 Task
对象
/**
* 对象锁
* @author wxhntmy
*/
public class ObjectLock {
private int counter;
public int getCounter() {
return counter;
}
public void setCounter(int counter) {
this.counter = counter;
}
}
synchronized 修饰一个代码块时,一个线程访问一个对象中的 synchronized(this) 同步代码块时,其他试图访问该对象的线程将被阻塞。
Task 实现
/**
* 对象锁任务
*
* @author wxhntmy
*/
public class ObjectLockTask implements Runnable {
/**
* 任务名称
*/
private String name;
/**
* 共享对象
*/
private ObjectLock objectLock;
/**
* 构造函数
*
* @param name 任务名称
* @param objectLock 线程共享对象
*/
public ObjectLockTask(String name, ObjectLock objectLock) {
this.name = name;
this.objectLock = objectLock;
}
@Override
public void run() {
//System.out.println(this.toString() + " is running!");
synchronized (objectLock) {
for (int i = 1; i < 6; i++) {
objectLock.setCounter(objectLock.getCounter() + i);
System.out.println(this.getName() + " counter: " + objectLock.getCounter());
}
}
//System.out.println(this.toString() + " is end!");
}
/**
* 获取任务名称
*
* @return 任务名称
*/
public String getName() {
return name;
}
@Override
public String toString() {
return "MyTask [ name = " + name + " ]";
}
}
提交运行 task
ObjectLock objectLock = new ObjectLock();
objectLock.setCounter(0);
for (int i = 1; i < 9; i++) {
ObjectLockTask objectLockTask = new ObjectLockTask("object lock task " + i, objectLock);
executorService.execute(objectLockTask);
}
运行结果
my-thread-1 has been created
my-thread-2 has been created
my-thread-3 has been created
object lock task 1 counter: 1
object lock task 1 counter: 3
my-thread-4 has been created
object lock task 1 counter: 6
object lock task 1 counter: 10
object lock task 1 counter: 15
object lock task 5 counter: 16
object lock task 5 counter: 18
object lock task 5 counter: 21
object lock task 5 counter: 25
object lock task 5 counter: 30
object lock task 3 counter: 31
object lock task 3 counter: 33
object lock task 3 counter: 36
object lock task 3 counter: 40
object lock task 3 counter: 45
object lock task 2 counter: 46
object lock task 2 counter: 48
object lock task 2 counter: 51
object lock task 2 counter: 55
object lock task 2 counter: 60
object lock task 6 counter: 61
object lock task 6 counter: 63
object lock task 6 counter: 66
object lock task 6 counter: 70
object lock task 6 counter: 75
object lock task 4 counter: 76
object lock task 4 counter: 78
object lock task 4 counter: 81
object lock task 4 counter: 85
object lock task 4 counter: 90
MyTask [ name = object lock task 7 ] rejected
MyTask [ name = object lock task 8 ] rejected
5、在线程里更新 UI
对象
/**
* 共享对象
* @author wxhntmy
*/
public class SharedObject {
/**
* Label
*/
private Label UIDisplayLabel;
public Label getUIDisplayLabel() {
return UIDisplayLabel;
}
public void setUIDisplayLabel(Label UIDisplayLabel) {
this.UIDisplayLabel = UIDisplayLabel;
}
}
Task 实现
import javafx.application.Platform;
import model.SharedObject;
import java.util.Random;
/**
* 更新UI任务
* @author wxhntmy
*/
public class RefreshUITask implements Runnable {
/**
* 任务名称
*/
private String name;
/**
* 共享对象
*/
private SharedObject sharedObject;
/**
* 构造函数
*
* @param name 任务名称
* @param sharedObject 线程共享对象
*/
public RefreshUITask(String name, SharedObject sharedObject) {
this.name = name;
this.sharedObject = sharedObject;
}
@Override
public void run() {
System.out.println(this.toString() + " is running!");
Random random = new Random();
Platform.runLater(() -> {
sharedObject.getUIDisplayLabel().setText("更新UI(显示随机数):" + random.nextInt(1000));
});
System.out.println(this.toString() + " is end!");
}
/**
* 获取任务名称
*
* @return 任务名称
*/
public String getName() {
return name;
}
@Override
public String toString() {
return "MyTask [ name = " + name + " ]";
}
}
提交运行 task
SharedObject sharedObject = new SharedObject();
sharedObject.setUIDisplayLabel(UIDisplayLabel);
//传递的参数是按引用进行传递,传递的是引用的地址,也就是变量所对应的内存空间的地址。
RefreshUITask refreshUITask = new RefreshUITask("refresh UI Task", sharedObject);
//execute(Runnable)
//这个方法接收一个Runnable实例,并且异步的执行
executorService.execute(refreshUITask);
运行结果
五、示例代码
以上是关于JavaFX 多线程之 ThreadPoolExecutor 线程池的主要内容,如果未能解决你的问题,请参考以下文章
是否可以将多线程JavaFX AudioClip声音的混合结果记录到磁盘上?
JavaFX实战:模拟电子琴弹奏效果,鼠标弹奏一曲piano送给大家