java如何实现一个Future

Posted 六月风花雪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java如何实现一个Future相关的知识,希望对你有一定的参考价值。

实现Futrue接口

public class MsgFuture<V> implements java.util.concurrent.Future<V> {
    ...
    ...
}    

  

Future的主要特性为Future.get()、

  get()
 get(long timeout, TimeUnit unit)

主要思路如下:
构造MsgFuture时,设置开始时间,这里是sendTime;设置timeout,默认get()方法的超时时间,我们的程序不可能会无限等待
默认的get()对应的值域是result,默认为一个NULL对象,标识没有返回数据

result的值需要其他线程在做完任务后将值写到Future对象中,这里暴露了一个方法setResult(object)
    /**
     * 设置结果值result,唤醒condition {@link #get(long, TimeUnit)}
     * @param result
     */
    public synchronized void setResult(Object result) {
        reentrantLock.lock();
        try {
            this.result = result;
            condition.signalAll();
        }finally {
            reentrantLock.unlock();
        }

    }

  使用ReentrantLock来进行数据可见性控制

       condition.signalAll()可以唤醒condition.await的阻塞wait

       至于其他线程如何调用到setResult(object)方法,可以使用ConcurrentHashMap,key为msgId,值为MsgFuture对象,设置成一个全局的,或两个线程都可访问,其他线程根据msgId获取到MsgFuture,然后调用setResult(object)方法

 

    /**
     * 获取结果,如果到达timeout还未得到结果,则会抛出TimeoutException
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     * @throws TimeoutException
     */
    @SuppressWarnings("all")
    public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        long left = getLeftTime(timeout, unit);                      //根据timeout配置获取剩余的世界
        if(left < 0){
            //已经没有剩余时间
            if(isDone()){                                            //如果已经完成,直接放回结果
                return (V)this.result;
            }else{
                //timeout
                throw new TimeoutException("返回超时,后续的响应将会被丢弃abort");
            }
        }else{

            reentrantLock.lock();                                    //同步
            try {
                //获取锁后先判断是否已经完成,防止无意义的await
                if(isDone()){                                        //先判断是否已经完成 
                    return (V)this.result;                           //直接返回  
                }
                logger.debug("await "+left+" ms");
                condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS);       //没有返回,阻塞等待,如果condition被唤醒,也会提前退出
            }finally {
                reentrantLock.unlock();
            }
            if(isDone()){                                            //被唤醒或超时时间已到,尝试判断是否完成
                return (V)this.result;                               //返回
            }

            throw new TimeoutException("未获取到结果");                //超时  
        }
    }

  

    public boolean isDone() {
        return this.result != NULL;
    }

  



全部代码
public class MsgFuture<V> implements java.util.concurrent.Future<V> {

    private final static Logger logger = LoggerFactory.getLogger(MsgFuture.class);

    /**
     * 全局的空对象,如果Future获取到值了,那么一定不是NULL
     */
    private final static Object NULL = new Object();
    /**
     * 主锁
     */
    private final ReentrantLock reentrantLock = new ReentrantLock();

    /**
     * 条件,利用它的condition.await(left, TimeUnit.MILLISECONDS)和notifyAll方法来实现阻塞、唤醒
     */
    private final Condition condition = reentrantLock.newCondition();

    private int timeout;

    private volatile Object result = NULL;

    private long sendTime;

    public MsgFuture(int timeout, long sendTime) {
        this.timeout = timeout;
        this.sendTime = sendTime;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    public boolean isCancelled() {
        return false;
    }

    public boolean isDone() {
        return this.result != NULL;
    }

    /**
     * 获取future结果
     * @return
     * @throws InterruptedException
     */
    public V get() throws InterruptedException {
        logger.debug("sendTime:{}",sendTime);
        try {
            return get(timeout, TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            logger.error("获取future结果异常", e);
        }
        return null;
    }

    /**
     * 获取结果,如果到达timeout还未得到结果,则会抛出TimeoutException
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     * @throws TimeoutException
     */
    @SuppressWarnings("all")
    public V get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
        long left = getLeftTime(timeout, unit);
        if(left < 0){
            //已经没有剩余时间
            if(isDone()){
                return (V)this.result;
            }else{
                //timeout
                throw new TimeoutException("返回超时,后续的响应将会被丢弃abort");
            }
        }else{

            reentrantLock.lock();
            try {
                //获取锁后先判断是否已经完成,防止无意义的await
                if(isDone()){
                    return (V)this.result;
                }
                logger.debug("await "+left+" ms");
                condition.await(getLeftTime(timeout, unit), TimeUnit.MILLISECONDS);
            }finally {
                reentrantLock.unlock();
            }
            if(isDone()){
                return (V)this.result;
            }

            throw new TimeoutException("未获取到结果");
        }
    }

    /**
     * 设置结果值result,唤醒condition {@link #get(long, TimeUnit)}
     * @param result
     */
    public synchronized void setResult(Object result) {
        reentrantLock.lock();
        try {
            this.result = result;
            condition.signalAll();
        }finally {
            reentrantLock.unlock();
        }

    }

    /**
     * 计算剩余时间
     * @param timeout
     * @param unit
     * @return
     */
    private long getLeftTime(long timeout, TimeUnit unit){
        long now = System.currentTimeMillis();
        timeout = unit.toMillis(timeout); // 转为毫秒
        return timeout - (now - sendTime);
    }

    /*public static void main(String[] args) {
        MsgFuture msgFuture = new MsgFuture(2000,System.currentTimeMillis());

        //测试先唤醒、后get是否正常
        msgFuture.setResult("yoxi");

        try {
            System.out.println(msgFuture.get(2000,TimeUnit.MILLISECONDS));
        } catch (InterruptedException e) {
            logger.error("Interrupt异常", e);
        } catch (TimeoutException e) {
            logger.error("测试先唤醒,后get出错", e);
        }
    }*/
}

  





以上是关于java如何实现一个Future的主要内容,如果未能解决你的问题,请参考以下文章

Java深入学习:Future模式

Java是如何实现Future模式的?万字详解!

Java是如何实现Future模式的?万字详解!

Java程序执行超时——Future接口介绍

利用多线程实现Future模式

java中异步调用,Callable,Future,FutureTask,CompletableFuture如何使用