java Future 阻塞
篇首语:本文由小常识网(小编为大家整理,主要介绍了java Future 阻塞相关的知识,希望对你有一定的参考价值。
但在方法中调用了Future 的get方法,这样是不是阻塞了整条线程
Future 的get不也产生阻塞吗?
Future 的get不也产生阻塞吗?
参考技术A 在任何并发性应用程序中,异步事件处理都至关重要。事件来源可能是不同的计算任务、I/O 操作或与外部系统的交互。无论来源是什么,应用程序代码都必须跟踪事件,协调为响应事件而采取的操作。Java 应用程序可采用两种基本的异步事件处理方法:该应用程序有一个协调线程等待事件,然后采取操作,或者事件可在完成时直接执行某项操作(通常采取执行应用程序所提供的代码的方式)。让线程等待事件的方法被称为阻塞 方法。让事件执行操作、线程无需显式等待事件的方法被称为非阻塞 方法。旧的 java.util.concurrent.Future 类提供了一种简单方式来处理预期的事件完成,但仅通过轮询或等待完成的方法。Java 8 中添加的 java.util.concurrent.CompletableFuture 类扩展了此功能,添加了大量合成或处理事件的方法。具体地讲,CompletableFuture 提供了在事件完成时执行应用程序代码的标准技术,包括各种组合任务(由 future 表示)的方式。这种组合技术使得编写非阻塞代码来处理事件变得很容易(或者至少比过去容易)。
Java Future源码分析
JDK future框架,提供了一种异步编程模式,基于线程池的。将任务runnable/callable提交到线程池executor,返回一个Future对象。通过future.get()获取执行结果,这里提交到线程池,后面的操作不会阻塞。future.get()获取结果会阻塞,其实也是用多线线程执行任务。
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @Author: <[email protected]> * @Description: jdk future demo * @Date: Created in : 2018/11/30 6:01 PM **/ public class TestJdkFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { testJdkFuture(); } public static void testJdkFuture() throws ExecutionException, InterruptedException { Callable<Integer> callable = () ->{ System.out.println("callable do some compute"); return 1; } ; Runnable runnable = () -> { System.out.println("runable do some compute"); }; ExecutorService executorService = Executors.newCachedThreadPool(); Future<Integer> future = executorService.submit(callable); Future runableFuture = executorService.submit(runnable); Object runableRes = runableFuture.get(); int res = future.get(); executorService.shutdown(); System.out.println("callable res: " + res); System.out.println("runnable res: " + runableRes); } }
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result =; ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
/** * Sets the result of this future to the given value unless * this future has already been set or has been cancelled. * * <p>This method is invoked internally by the {@link #run} method * upon successful completion of the computation. * * @param v the value */ protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
/** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next =; if (next == null) break; = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
/** * Waits if necessary for the computation to complete, and then * retrieves its result. * * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting */ V get() throws InterruptedException, ExecutionException;
/** * @throws CancellationException {@inheritDoc} */ public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
awaitDone(false, 0l)主要是阻塞线程,进入方法
/** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
/** * Returns result or throws exception for completed task. * * @param s completed state value */ @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
future框架 = 线程池 + futrue
以上是关于java Future 阻塞的主要内容,如果未能解决你的问题,请参考以下文章
面试专栏Guava - ListenableFuture,避免Future获取阻塞问题,增加回调
java并发编程之Future.get() 在线程池配置RejectedExecutionHandler为ThreadPoolExecutor.DiscardPolicy策略时一直阻塞
java并发编程之Future.get() 在线程池配置RejectedExecutionHandler为ThreadPoolExecutor.DiscardPolicy策略时一直阻塞