JavaFX 多线程之 ThreadPoolExecutor 线程池

Posted Calvin Chan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JavaFX 多线程之 ThreadPoolExecutor 线程池相关的知识,希望对你有一定的参考价值。

一、开发环境

IntelliJ IDEA 2020.2.3 + JDK 1.8 + JavaFX Scene Builder 2.0

二、ThreadPoolExecutor 介绍

        ThreadPoolExecutor 是用来处理异步任务的一个接口,可以将其理解成为一个线程池和一个任务队列,提交到 ExecutorService 对象的任务会被放入任务队列或者直接被线程池中的线程执行。

ThreadPoolExecutor 对线程池和队列的使用方式如下:

  1. 从线程池中获取可用线程执行任务,如果没有可用线程则使用 ThreadFactory 创建新的线程,直到线程数达到 corePoolSize 限制
  2. 线程池线程数达到 corePoolSize 以后,新的任务将被放入队列,直到队列不能再容纳更多的任务
  3. 当队列不能再容纳更多的任务以后,会创建新的线程,直到线程数达到 maxinumPoolSize 限制
  4. 线程数达到 maxinumPoolSize 限制以后新任务会被拒绝执行,调用 RejectedExecutionHandler 进行处理

三、ThreadPoolExecutor 实现

1、JavaFX 界面

在这里插入图片描述

2、线程池实例初始化 MyThreadPoolExecutor

MyThreadPoolExecutor 是对线程池实例的初始化

import java.util.concurrent.*;

/**
 * 线程池
 * @author wxhntmy
 */
public class MyThreadPoolExecutor {

    /**
     * 核心线程池大小
     */
    int corePoolSize = 2;
    /**
     * 最大线程池大小
     */
    int maximumPoolSize = 4;
    /**
     * 线程最大空闲时间
     */
    long keepAliveTime = 10;
    /**
     * 时间单位
     */
    TimeUnit unit = TimeUnit.SECONDS;
    /**
     * 线程等待队列
     */
    BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(2);
    /**
     * 线程创建工厂
     */
    ThreadFactory threadFactory = new NameTreadFactory();
    /**
     * 拒绝策略
     */
    RejectedExecutionHandler handler = new MyIgnorePolicy();

    /**
     * 构造函数:初始化线程池
     * @param corePoolSize 线程池大小
     * @param maximumPoolSize 最大线程池大小
     * @param keepAliveTime 线程最大空闲时间
     */
    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.keepAliveTime = keepAliveTime;
    }

    /**
     * 构造函数
     */
    public MyThreadPoolExecutor() {

    }

    /**
     * 默认线程池
     * @return 线程池
     */
    public ExecutorService createNewThreadPool(){
        return new ThreadPoolExecutor(this.corePoolSize, this.maximumPoolSize, this.keepAliveTime, this.unit,
                this.workQueue, this.threadFactory, this.handler);
    }
    
    /**
     * newFixedThreadPool:数量固定大小,该线程池重用在共享的无边界队列上运行的固定数量的线程。
     * 在任何时候,最多 corePoolSize(构造函数的参数,核心线程数与最大线程数相等)个线程都是活动的处理任务。
     * 如果在所有线程都处于活动状态时提交了其他任务,则它们将在队列中等待,直到某个线程可用为止。
     * 如果在关闭之前执行过程中由于执行失败导致任何线程终止,则在执行后续任务时将使用新线程代替。
     * 池中的线程将一直存在,直到明确将其关闭。
     * @param corePoolSize 核心线程数
     * @return 线程池
     */
    public ExecutorService newFixedThreadPool(int corePoolSize){
        return new ThreadPoolExecutor(corePoolSize, corePoolSize, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(), this.threadFactory, this.handler);
    }

    /**
     * newCachedThreadPool:数量无上限,该线程池会根据需要创建,但是优先使用之前构造的线程。
     * 这些池通常将提高执行许多短期异步任务的程序的性能,如果没有可用的现有线程,则将创建一个新线程并将其添加到池中。
     * 当60S内未使用的线程将被终止并从缓存中删除。 因此,保持空闲时间足够长的池不会消耗任何资源。
     * @return 线程池
     */
    public ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(), this.threadFactory, this.handler);
    }

    /**
     * newScheduledThreadPool:该线程池可以安排命令在给定的延迟后运行或定期执行。
     * @param corePoolSize 线程池核心线程大小
     * @return 线程池
     */
    public ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize, this.threadFactory, this.handler);
    }
}

3、拒绝策略 MyIgnorePolicy

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 拒绝策略
 * @author wxhntmy
 */
public class MyIgnorePolicy implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        doLog(r, e);
    }

    private void doLog(Runnable r, ThreadPoolExecutor e) {
        // 可做日志记录等
        System.err.println( r.toString() + " rejected");
    }
}

4、线程创建工厂 NameTreadFactory

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程创建工厂
 * @author wxhntmy
 */
public class NameTreadFactory implements ThreadFactory {

    private final AtomicInteger mThreadNum = new AtomicInteger(1);

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
        System.out.println(t.getName() + " has been created");
        return t;
    }
}

四、Task 实现

1、创建 ThreadPoolExecutor

/**
 * 线程池
*/
private MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor();

/**
 * 线程池实例
*/
ExecutorService executorService = myThreadPoolExecutor.createNewThreadPool();

2、通过实现 Runnable 接口创建 Task

通过实现 Runnable 接口创建

/**
 * 任务/线程
 * @author wxhntmy
 */
public class MyTask implements Runnable {

    /**
     * 任务名称
     */
    private String name;

    /**
     * 构造函数
     * @param name 任务名称
     */
    public MyTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println(this.toString() + " is running!");
        try {
            //让任务执行慢点
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this.toString() + " is end!");
    }

    /**
     * 获取任务名称
     * @return 任务名称
     */
    public String getName() {
        return name;
    }

    @Override
    public String toString() {
        return "MyTask [ name = " + name + " ]";
    }
}

提交运行 task

MyTask myTask = new MyTask("new thread");
//execute(Runnable)
//这个方法接收一个Runnable实例,并且异步的执行
executorService.execute(myTask);

运行结果
在这里插入图片描述

3、通过实现 Callable 接口创建 Task

import java.util.concurrent.Callable;

/**
 * 带返回结果的任务,返回一个Callable
 * @author wxhntmy
 */
public class ReturnResultTaskCallable implements Callable<Object> {

    /**
     * 任务名称
     */
    private String name;

    /**
     * 构造函数
     * @param name 任务名称
     */
    public ReturnResultTaskCallable(String name) {
        this.name = name;
    }

    @Override
    public Object call() throws Exception {
        System.out.println(this.toString() + " is running!");

        System.out.println("Asynchronous Callable");

        try {
            //让任务执行慢点
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this.toString() + " is end!");

        return "Callable Result";
    }

    /**
     * 获取任务名称
     * @return 任务名称
     */
    public String getName() {
        return name;
    }

    @Override
    public String toString() {
        return "MyCallable [ name = " + name + " ]";
    }
}

提交运行 task

ReturnResultTaskCallable returnResultTaskCallable = new ReturnResultTaskCallable("return result callable");
//submit(Callable)会返回一个Future对象,submit(Callable)接收的是一个Callable的实现,
//Callable接口中的call()方法有一个返回值,可以返回任务的执行结果,而Runnable接口中的run()方法是void的,没有返回值。
Future<Object> future = executorService.submit(returnResultTaskCallable);
//如果任务执行完成,future.get()方法会返回Callable任务的执行结果。
//注意,future.get()方法会产生阻塞。
try {
    System.out.println("future.get(): " + future.get());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

运行结果
在这里插入图片描述

4、带对象锁的 Task

对象

/**
 * 对象锁
 * @author wxhntmy
 */
public class ObjectLock {
    private int counter;

    public int getCounter() {
        return counter;
    }

    public void setCounter(int counter) {
        this.counter = counter;
    }
}

synchronized 修饰一个代码块时,一个线程访问一个对象中的 synchronized(this) 同步代码块时,其他试图访问该对象的线程将被阻塞。

Task 实现

/**
 * 对象锁任务
 *
 * @author wxhntmy
 */
public class ObjectLockTask implements Runnable {

    /**
     * 任务名称
     */
    private String name;

    /**
     * 共享对象
     */
    private ObjectLock objectLock;

    /**
     * 构造函数
     *
     * @param name       任务名称
     * @param objectLock 线程共享对象
     */
    public ObjectLockTask(String name, ObjectLock objectLock) {
        this.name = name;
        this.objectLock = objectLock;
    }

    @Override
    public void run() {
        //System.out.println(this.toString() + " is running!");

        synchronized (objectLock) {
            for (int i = 1; i < 6; i++) {
                objectLock.setCounter(objectLock.getCounter() + i);
                System.out.println(this.getName() + " counter: " + objectLock.getCounter());
            }
        }

        //System.out.println(this.toString() + " is end!");
    }

    /**
     * 获取任务名称
     *
     * @return 任务名称
     */
    public String getName() {
        return name;
    }

    @Override
    public String toString() {
        return "MyTask [ name = " + name + " ]";
    }
}

提交运行 task

ObjectLock objectLock = new ObjectLock();
objectLock.setCounter(0);

for (int i = 1; i < 9; i++) {
    ObjectLockTask objectLockTask = new ObjectLockTask("object lock task " + i, objectLock);
    executorService.execute(objectLockTask);
}

运行结果

my-thread-1 has been created
my-thread-2 has been created
my-thread-3 has been created
object lock task 1 counter: 1
object lock task 1 counter: 3
my-thread-4 has been created
object lock task 1 counter: 6
object lock task 1 counter: 10
object lock task 1 counter: 15
object lock task 5 counter: 16
object lock task 5 counter: 18
object lock task 5 counter: 21
object lock task 5 counter: 25
object lock task 5 counter: 30
object lock task 3 counter: 31
object lock task 3 counter: 33
object lock task 3 counter: 36
object lock task 3 counter: 40
object lock task 3 counter: 45
object lock task 2 counter: 46
object lock task 2 counter: 48
object lock task 2 counter: 51
object lock task 2 counter: 55
object lock task 2 counter: 60
object lock task 6 counter: 61
object lock task 6 counter: 63
object lock task 6 counter: 66
object lock task 6 counter: 70
object lock task 6 counter: 75
object lock task 4 counter: 76
object lock task 4 counter: 78
object lock task 4 counter: 81
object lock task 4 counter: 85
object lock task 4 counter: 90
MyTask [ name = object lock task 7 ] rejected
MyTask [ name = object lock task 8 ] rejected

5、在线程里更新 UI

对象

/**
 * 共享对象
 * @author wxhntmy
 */
public class SharedObject {
    /**
     * Label
     */
    private Label UIDisplayLabel;

    public Label getUIDisplayLabel() {
        return UIDisplayLabel;
    }

    public void setUIDisplayLabel(Label UIDisplayLabel) {
        this.UIDisplayLabel = UIDisplayLabel;
    }
}

Task 实现

import javafx.application.Platform;
import model.SharedObject;

import java.util.Random;

/**
 * 更新UI任务
 * @author wxhntmy
 */
public class RefreshUITask implements Runnable {

    /**
     * 任务名称
     */
    private String name;

    /**
     * 共享对象
     */
    private SharedObject sharedObject;

    /**
     * 构造函数
     *
     * @param name 任务名称
     * @param sharedObject 线程共享对象
     */
    public RefreshUITask(String name, SharedObject sharedObject) {
        this.name = name;
        this.sharedObject = sharedObject;
    }

    @Override
    public void run() {
        System.out.println(this.toString() + " is running!");
        Random random = new Random();
        Platform.runLater(() -> {
            sharedObject.getUIDisplayLabel().setText("更新UI(显示随机数):" + random.nextInt(1000));
        });

        System.out.println(this.toString() + " is end!");
    }

    /**
     * 获取任务名称
     *
     * @return 任务名称
     */
    public String getName() {
        return name;
    }

    @Override
    public String toString() {
        return "MyTask [ name = " + name + " ]";
    }
}

提交运行 task

SharedObject sharedObject = new SharedObject();
sharedObject.setUIDisplayLabel(UIDisplayLabel);
//传递的参数是按引用进行传递,传递的是引用的地址,也就是变量所对应的内存空间的地址。
RefreshUITask refreshUITask = new RefreshUITask("refresh UI Task", sharedObject);
//execute(Runnable)
//这个方法接收一个Runnable实例,并且异步的执行
executorService.execute(refreshUITask);

运行结果
在这里插入图片描述
在这里插入图片描述

五、示例代码

JavaFX多线程之 ThreadPoolExecutor 线程池

以上是关于JavaFX 多线程之 ThreadPoolExecutor 线程池的主要内容,如果未能解决你的问题,请参考以下文章

JavaFX 中的多线程会挂起 UI

是否可以将多线程JavaFX AudioClip声音的混合结果记录到磁盘上?

Python-线程池进程池,协程

JavaFX实战:模拟电子琴弹奏效果,鼠标弹奏一曲piano送给大家

JavaFX实战:模拟电子琴弹奏效果,鼠标弹奏一曲piano送给大家

JavaFX实战:模拟电子琴弹奏效果,鼠标弹奏一曲piano送给大家