JUC线程池扩展可回调的Future

Posted 热爱编程的大忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC线程池扩展可回调的Future相关的知识,希望对你有一定的参考价值。

JUC线程池扩展可回调的Future


引言

最近在看JUC线程池java.util.concurrent.ThreadPoolExecutor的源码实现,其中了解到java.util.concurrent.Future的实现原理。从目前java.util.concurrent.Future的实现来看,虽然实现了异步提交任务,但是任务结果的获取过程需要主动调用Future#get()或者Future#get(long timeout, TimeUnit unit),而前者是阻塞的,后者在异步任务执行时间不确定的情况下有可能需要进行轮询,这两种情况和异步调用的初衷有点相违背。于是笔者想结合目前了解到的Future实现原理的前提下扩展出支持(监听)回调的Future,思路上参考了Guava增强的ListenableFuture。本文编写的时候使用的JDK是JDK11,代码可以在JDK[8,12]版本上运行,其他版本可能不适合。


简单分析Future的实现原理

并发大师Doug Lea在设计JUC线程池的时候,提供了一个顶层执行器接口Executor:

public interface Executor 
    void execute(Runnable command);
    

实际上,这里定义的方法Executor#execute()是整套线程池体系最核心的接口,也就是ThreadPoolExecutor定义的核心线程、额外创建的线程(线程池最大线程容量 - 核心线程数)都是在这个接口提交任务的时候懒创建的,也就是说ExecutorService接口扩展的功能都是基于Executor#execute()的基础进行扩展。Executor#execute()方法只是单纯地把任务实例Runnable对象投放到线程池中分配合适的线程执行,但是由于方法返回值是void类型,我们是无法感知任务什么时候执行完毕。这个时候就需要对Runnable任务实例进行包装(下面是伪代码 + 伪逻辑):

// 下面这个Wrapper和Status类是笔者虚构出来
@RequiredArgsConstructor
class Wrapper implements Runnable

    private final Runnable target;
    private Status status = Status.of("初始化");

    @Override
    public void run()
        try
           target.run();
           status = Status.of("执行成功");
        catch(Throwable t)
           status = Status.of("执行异常"); 
        
    

我们只需要把new Wrapper(原始Runnable实例)投放到线程池执行,那么通过定义好的Status状态记录变量就能得知异步任务执行的状态,以及什么时候执行完毕(包括正常的执行完毕和异常的执行完毕)。这里仅仅解决了任务执行的状态获取,但是Executor#execute()方法法返回值是void类型的特点使得我们无法回调Runnable对象执行的结果。这个时候需要定义一个可以回调执行结果的接口,其实已经有现成的接口Callable:

@FunctionalInterface
public interface Callable<V> 
    V call() throws Exception;
    

这里遇到了一个问题:由于Executor#execute()只接收Runnable参数,我们需要把Callable接口适配到Runnable接口,这个时候,做一次简单的委托即可:

@RequiredArgsConstructor
class Wrapper implements Runnable

    private final Callable callable;
    private Status status = Status.of("初始化");
    @Getter
    private Object outcome;

    @Override
    public void run()
        try
           outcome = callable.call();
           status = Status.of("执行成功");
        catch(Throwable t)
           status = Status.of("执行异常"); 
           outcome = t;
        
    

这里把Callable实例直接委托给Wrapper,而Wrapper实现了Runnable接口,执行结果直接存放在定义好的Object类型的对象outcome中即可。当我们感知到执行状态已经结束,就可以从outcome中提取到执行结果。


Future的实现

上面一个小结仅仅对Future实现做一个相对合理的虚拟推演,实际上,RunnableFuture才是JUC中常用的复合接口,它同时实现了Runnable和Future:

public interface RunnableFuture<V> extends Runnable, Future<V>    
    void run();

上一节提到的虚构出来的Wrapper类,在JUC中类似的实现是java.util.concurrent.FutureTask,它就是Callable和Runnable的适配器,FutureTask实现了RunnableFuture接口:

public class FutureTask<V> implements RunnableFuture<V> 

    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;
    
    // 省略其他代码
   

注意到核心属性state表示执行状态,outcome承载执行结果。接着看提交Callable类型任务的方法ExecutorService#submit():

public interface ExecutorService extends Executor 

    // 省略其他接口方法

    <T> Future<T> submit(Callable<T> task);
    

当我们通过上述ExecutorService#submit()方法提交Callable类型任务的时候,实际上做了如下的步骤:

  • 检查入参task的存在性,如果为null抛出NullPointerException。
  • 把Callable类型的task包装为FutureTask实例。
  • 把新建的FutureTask实例放到线程池中执行,也就是调用Executor#execute(FutureTask实例)。
  • 返回FutureTask实例的接口实例RunnableFuture(实际上是返回子接口Future实例)。

如果我们需要获取结果,可以Future#get()或者Future#get(long timeout, TimeUnit unit)获取,调用这两个方法的时候参看FutureTask里面的方法实现,得知步骤如下:

  • 如果状态state小于等于COMPLETING(1),说明任务还在执行中,获取结果的请求线程会放入WaitNode类型的队列中进行阻塞。
  • 如果任务执行完毕,不管异常完毕还是正常完毕,除了会更新状态state和把结果赋值到outcome之外,还会唤醒所有阻塞获取结果的线程,然后调用钩子方法FutureTask#done()(具体见源码FutureTask#finishCompletion())。

其实分析了这么多,笔者想指出的结论就是:Callable类型任务提交到线程池中执行完毕(包括正常执行完毕和异常执行完毕)之后,都会回调钩子方法FutureTask#done()。这个就是我们扩展可监听Future的理论依据。


扩展可回调的Future

先做一次编码实现,再简单测试其功能。

编码实现

先定义一个Future接口的子接口ListenableFuture,用于添加可监听的回调:

public interface ListenableFuture<V> extends Future<V> 
    
    void addCallback(ListenableFutureCallback<V> callback, Executor executor);

ListenableFutureCallback是一个函数式回调接口:

@FunctionalInterface
public interface ListenableFutureCallback<V> 

    void callback(V value, Throwable throwable);

对于ListenableFutureCallback而言,回调的结果value和throwable是互斥的。正常执行完毕的情况下value将会是执行结果值,throwable为null;异常执行完毕的情况下,value将会是null,throwable将会是抛出的异常实例。如果更习惯于分开处理正常执行完毕的结果和异常执行完毕的结果,ListenableFutureCallback可以这样定义:

public interface ListenableFutureCallback<V> 

    void onSuccess(V value);

    void onError(Throwable throwable);

接着定义ListenableExecutorService接口继承ExecutorService接口:

public interface ListenableExecutorService extends ExecutorService 

    <T> ListenableFuture<T> listenableSubmit(Callable<T> callable);

    /**
     * 定义这个方法是因为有些时候由于任务执行时间非常短,有可能通过返回的ListenableFuture实例添加回调之前已经执行完毕,因此可以支持显式传入回调
     */
    <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor);

然后添加一个执行单元适配器ListenableFutureCallbackRunnable,承载每次回调触发的调用(实现Runnable接口,从而支持异步执行):

@RequiredArgsConstructor
public class ListenableFutureCallbackRunnable<V> implements Runnable 

    private final ListenableFutureCallback<V> callback;
    private final V value;
    private final Throwable throwable;

   @Override
    public void run() 
        if(throwable==null)
            callback.onSuccess(value);
        else 
            callback.onError(throwable);
        
    

接着需要定义一个FutureTask的子类ListenableFutureTask,核心逻辑是覆盖FutureTask#done()方法触发回调:

public class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> 

    private final List<Execution<V>> executions = new ArrayList<>();

    public ListenableFutureTask(Callable<V> callable) 
        super(callable);
    

    public ListenableFutureTask(Runnable runnable, V result) 
        super(runnable, result);
    

    public static <V> ListenableFutureTask<V> newTaskFor(Callable<V> callable) 
        return new ListenableFutureTask<>(callable);
    

    @Override
    protected void done() 
        Iterator<Execution<V>> iterator = executions.iterator();
        Throwable throwable = null;
        V value = null;
        try 
            value = get();
         catch (Throwable t) 
            throwable = t;
        
        while (iterator.hasNext()) 
            Execution<V> execution = iterator.next();
            ListenableFutureCallbackRunnable<V> callbackRunnable = new ListenableFutureCallbackRunnable<>(execution.getCallback(),
                    value, throwable);
            // 异步回调
            if (null != execution.getExecutor()) 
                execution.getExecutor().execute(callbackRunnable);
             else 
                // 同步回调
                callbackRunnable.run();
            
        
    

    @Override
    public void addCallback(ListenableFutureCallback<V> callback, Executor executor) 
        Execution<V> execution = new Execution<>();
        execution.setCallback(callback);
        execution.setExecutor(executor);
        executions.add(execution);
    


最后一步就是编写线程池ListenableThreadPoolExecutor,继承自ThreadPoolExecutor并且实现ListenableExecutorService接口:

public class ListenableThreadPoolExecutor extends ThreadPoolExecutor implements ListenableExecutorService 

    public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) 
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    

    public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
    BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) 
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    

    public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, 
    BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) 
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    

    public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
     BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    

    @Override
    public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable) 
        if (null == callable) 
            throw new IllegalArgumentException("callable");
        
        ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
        execute(listenableFutureTask);
        return listenableFutureTask;
    

    @Override
    public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor) 
        if (null == callable) 
            throw new IllegalArgumentException("callable");
        
        if (null == callbacks) 
            throw new IllegalArgumentException("callbacks");
        
        ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable);
        for (ListenableFutureCallback<T> callback : callbacks) 
            listenableFutureTask.addCallback(callback, executor);
        
        execute(listenableFutureTask);
        return listenableFutureTask;
    



测试

引入junit,编写测试类如下:

package com;

import com.futureTest.PauseUtil;
import com.listenerFuture.ListenableExecutorService;
import com.listenerFuture.ListenableFuture;
import com.listenerFuture.ListenableFutureCallback;
import com.listenerFuture.ListenableThreadPoolExecutor;
import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author 大忽悠
 * @create 2022/10/12 17:37
 */
public class Main 
    private static ListenableExecutorService executorService;
    private static Executor executor;

    private static void init() 
        executorService = new ListenableThreadPoolExecutor(
                1, 3, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)
                , new ThreadFactory() 
            private AtomicInteger count = new AtomicInteger();

            @Override
            public Thread newThread(Runnable r) 
                Thread thread = new Thread(r);
                thread.setName(String.format("ListenableWorker-%d", count.getAndIncrement()));
                return thread;
            
        
        );

        executor= Executors.newFixedThreadPool(3);
    


   private static void printCurrentThreadName()
       System.out.println("当前线程名为: "+Thread.currentThread().getName());
   

    public static void main(String[] args) throws IOException 
           init();
        ListenableFuture<String> listenableFuture = executorService.listenableSubmit(() -> 
            PauseUtil.pauseRandomTime();
            printCurrentThreadName();
            return "我是大忽悠";
        );

        ListenableFuture<String> listenableFuture1 = executorService.listenableSubmit(() 你真的懂并发吗?谈谈对JUC线程池ThreadPoolExecutor的认识吧

Java - "JUC线程池" Callable与Future

Java多线程系列--“JUC线程池”06之 Callable和Future

Java多线程系列--“JUC线程池”06之 Callable和Future

JUC高级多线程_10:Future异步回调的具体介绍与使用

JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(提交任务: submitexecute invokAllinvokeAny)