JDK8CompletableFuture 非同步處理
Posted sp42a
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JDK8CompletableFuture 非同步處理相关的知识,希望对你有一定的参考价值。
如果你要非同步(Asyncronous)讀取文字檔案,在檔案讀取完後做某些事,可以使用 ExecutorService 來 submit 一個 Runnable 物件,像是類似以下的流程:
public static Future readFileAsync(String file, Consumer<String> success, Consumer<IOException> fail, ExecutorService service)
return service.submit(() ->
try
success.accept(new String(Files.readAllBytes(Paths.get(file))));
catch (IOException ex)
fail.accept(ex);
);
這麼一來,你就可使用以下非同步的風格來讀取一個文字檔案:
readFileAsync(args[0],
content -> out.println(content), // 成功處理
ex -> ex.printStackTrace(), // 失敗處理
Executors.newFixedThreadPool(10)
);
out.println(content)
與 ex.printStackTrace()
會在與讀取檔案的同一個執行緒中進行,如果你想要在不同執行緒中進行,得再額外作些設計;另一方面,這種非同步操作使用的回呼(Callback)風格,在每次回呼中若又再度進行非同步操作及回呼,很容易寫出回呼地獄(Callback hell),造成可讀性不佳。例如若有個類似 readFileAsync 風格的非同步 processContentAsync 方法,用來再繼續處理 readFileAsync 讀取的檔案內容,那麼可能撰寫出以下的程式碼:
readFileAsync(args[0],
content -> processContentAsync(content,
processedContent -> out.println(processedContent) ,
ex -> ex.printStackTrace(), service),
ex -> ex.printStackTrace(), service);
實際上非同步處理的組合需求很多,為此,JDK8 新增了 java.util.concurrent.CompletableFuture
,你可以使用它來改寫 readFileAsync,例如:
package cc.openhome;
import java.io.IOException;
import static java.lang.System.out;
import java.nio.file.*;
import java.util.concurrent.*;
public class Async
public static CompletableFuture<String> readFileAsync(String file, ExecutorService service)
return CompletableFuture.supplyAsync(() ->
try
return new String(Files.readAllBytes(Paths.get(file)));
catch(IOException ex)
throw new RuntimeException(ex);
, service);
public static void main(String[] args) throws Exception
ExecutorService poolService = Executors.newFixedThreadPool(10);
readFileAsync(args[0], poolService).whenComplete((ok, ex) ->
if(ex == null)
out.println(ok);
else
ex.printStackTrace();
).join(); // join 是為了避免 main 執行緒在任務完成前就關閉 ExecutorService
poolService.shutdown();
CompletableFuture 的靜態方法 supplyAsync
接受 Supplier 實例,可指定非同步執行任務,它會傳回 CompletableFuture 實例,你可以呼叫 whenComplete 以 BiConsumer 實例指定任務完成如何處理,第一個參數是 Supplier 的傳回值,若有例外發生則會指定給第二個參數,想要在任務完成後繼續非同步地處理,則可以使用 whenCompleteAsync 方法。
如果第一個 CompletableFuture 任務完成後,想要繼續以非同步方式來處理結果,可以使用 thenApplyAsync。例如:
readFileAsync(args[0], poolService)
.thenApplyAsync(String::toUpperCase)
.whenComplete((ok, ex) ->
if(ex == null)
out.println(ok);
else
ex.printStackTrace();
);
CompletableFuture 實例的方法,基本上都會有同步與非同步兩個版本,可以用 Async 後置名稱來區分,例如,thenApplyAsync 的同步版本就是 thenApply 方法。
〈Optional 與 Stream 的 flatMap〉中談到,Optional 與 Stream 中各定義有 map 方法,可讓你指定 Optional 或 Stream 中的值 T 如何映射為值 U,然後傳回新的 Optional 或 Stream,CompletableFuture 的 thenApply(以及非同步的 thenApply 版本)其實就類似 Optional 或 Stream 的 map,可讓你指定前一個 CompletableFuture 處理後的結果 T 如何映射為值 U,然後傳回新的 CompletableFuture。
該份文件中也談到,Optional 與 Stream 中也各定義有 flatMap 方法,可讓你指定 Optional 或 Stream 中的值 T 與 Optional<U>
、Stream<U>
之間的關係,CompletableFuture 也有個 thenCompose(以及非同步的 thenComposeAsnyc 版本),作用就類似 flatMap,可以讓你指定前一個 CompletableFuture 處理後的結果 T 如何映射為值 CompleteableFuture<U>
,舉例來說,你想在 readFileAsync 傳回的 CompletableFuture<String>
處理完後,繼續組合 processContentAsync 方法傳回 CompletableFuture<String>
,就可以如下撰寫:
readFileAsync(args[0], poolService)
.thenCompose(content -> processContentAsync(content, poolService))
.whenComplete((ok, ex) ->
if (ex == null)
out.println(ok);
else
ex.printStackTrace();
);
CompletableFuture 上還有許多方法可以使用,詳情除了參考 API 文件之中,還可以看看〈Java 8: Definitive guide to CompletableFuture〉這篇文章,有 JDK8 之前,可以使用 guava-libraries 的 ListenableFuture,有興趣的話可以參考〈ListenableFuture 聽取未來需求〉,其他各技術生態中的類似產物,可以參考〈Composable Future API〉的介紹。
【JDK8】從 synchronized、Lock 到 StampedLock
之前在 StampedLock Idioms 中介紹了 JDK8 新的 StampedLock,剛好我書中也有幾個類似的例子,於是想說也整理在一起來個東施效顰好了。
首先來看個簡易的 ArrayList 實作好了:
package cc.openhome;
import java.util.Arrays;
public class ArrayList<E>
private Object[] elems;
private int next;
public ArrayList(int capacity)
elems = new Object[capacity];
public ArrayList()
this(16);
public void add(E e)
if(next == elems.length)
elems = Arrays.copyOf(elems, elems.length * 2);
elems[next++] = e;
public E get(int index)
return (E) elems[index];
public int size()
return next;
如果將這個 ArrayList 用在只有主執行緒的環境中時,它沒有什麼問題。如果有兩個以上的執行緒同時使用它會如何?例如:
package cc.openhome;
public class ArrayListDemo
public static void main(String[] args)
ArrayList list = new ArrayList();
new Thread(() ->
while (true)
list.add(1);
).start();
new Thread(() ->
while (true)
list.add(2);
).start();
在這個範例中建立了兩個執行緒,分別於 while
迴圈中對同一個 ArrayList 實例進行 add,如果你嘗試執行程式,「有可能」會發生 ArrayIndexOutOfBoundsException
例外:
Exception in thread "Thread-1" java.lang.ArrayIndexOutOfBoundsException: 64
at cc.openhome.ArrayList.add(ArrayList.java:21)
at cc.openhome.ArrayListDemo.lambda$main$1(ArrayListDemo.java:15)
at cc.openhome.ArrayListDemo$$Lambda$2/1072591677.run(Unknown Source)
at java.lang.Thread.run(Thread.java:744)
synchronized
學過多執行緒的都知道,這是機率問題,有可能發生,也有可能沒發生,就先因陣列長度過長,JVM 分配到的記憶體不夠,而發生 java.lang.OutOfMemoryError
。這是多個執行緒存取同一物件相同資源時所引發的競速情況(Race condition),也知道解決的方法之一可以在 add 等方法上加上 synchronized
關鍵字。例如:
package cc.openhome;
import java.util.Arrays;
public class SynchronizedArrayList<E>
private Object[] elems;
private int next;
public SynchronizedArrayList(int capacity)
elems = new Object[capacity];
public SynchronizedArrayList()
this(16);
public synchronized void add(E e)
if(next == elems.length)
elems = Arrays.copyOf(elems, elems.length * 2);
elems[next++] = e;
public synchronized E get(int index)
return (E) elems[index];
public synchronized int size()
return next;
這是學習 Java 多執行緒時一定會接觸到的基本概念,如果在方法上標示 synchronized
,則執行方法必須取得該實例的鎖定,這是避免競速問題的作法之一。不過直接使用 synchronized 有許多限制,未取得鎖定的執行緒會直接被阻斷,如果你希望的功能是執行緒可嘗試取得鎖定,無法取得鎖定時就先作其他事,直接使用 synchronized 必須透過一些設計才可完成這個需求。
ReentrantLock
java.util.concurrent.locks
套件中提供了 ReentrantLock,,可以達到 synchronized 的作用,也提供額外的功能,如果單純用來達到 synchronized 的作用,可以如下改寫方才的範例:
package cc.openhome;
import java.util.Arrays;
import java.util.concurrent.locks.*;
public class ReentrantLockArrayList<E>
private Lock lock = new ReentrantLock();
private Object[] elems;
private int next;
public ReentrantLockArrayList(int capacity)
elems = new Object[capacity];
public ReentrantLockArrayList()
this(16);
public void add(E elem)
lock.lock();
try
if (next == elems.length)
elems = Arrays.copyOf(elems, elems.length * 2);
elems[next++] = elem;
finally
lock.unlock();
public E get(int index)
lock.lock();
try
return (E) elems[index];
finally
lock.unlock();
public int size()
lock.lock();
try
return next;
finally
lock.unlock();
如果有兩個執行緒都想呼叫 get 與 size 方法,由於鎖定的關係,其中一個執行緒只能等待另一執行緒解除鎖定,無法兩個執行緒同時呼叫 get 與 size,然而這兩個方法都只是讀取物件狀態,並沒有變更物件狀態,如果只是讀取操作,可允許執行緒同時並行的話,那對讀取效率將會有所改善,你可以使用兩個 Lock 物件,透過設計來達到這項需求,不過 JDK 本身提供有 ReentrantReadWriteLock 可以使用。
ReentrantReadWriteLock
ReentrantReadWriteLock 的 readLock
方法會傳回 ReentrantReadWriteLock.ReadLock
實例,writeLock
方法會傳回 ReentrantReadWriteLock.WriteLock
實例。呼叫 ReadLock 的 lock
方法時,若沒有任何 WriteLock 呼叫過 lock
方法,也就是沒有任何寫入鎖定時,就可以取得讀取鎖定。呼叫 WriteLock 的 lock
方法時,若沒有任何 ReadLock 或 WriteLock 呼叫過 lock
方法,也就是沒有任何讀取或寫入鎖定時,才可以取得寫入鎖定。
例如可使用 ReadWriteLock 改寫先前的 ArrayList,改進讀取效率:
package cc.openhome;
import java.util.Arrays;
import java.util.concurrent.locks.*;
public class ReentrantReadWriteLockArrayList<E>
private ReadWriteLock lock = new ReentrantReadWriteLock();
private Object[] elems;
private int next;
public ReentrantReadWriteLockArrayList(int capacity)
elems = new Object[capacity];
public ReentrantReadWriteLockArrayList()
this(16);
public void add(E elem)
lock.writeLock().lock();
try
if (next == elems.length)
elems = Arrays.copyOf(elems, elems.length * 2);
elems[next++] = elem;
finally
lock.writeLock().unlock();
public E get(int index)
lock.readLock().lock();
try
return (E) elems[index];
finally
lock.readLock().unlock();
public int size()
lock.readLock().lock();
try
return next;
finally
lock.readLock().unlock();
如此設計之後,若執行緒都多是在呼叫 get 或 size 方法,就比較不會因等待鎖定而進入阻斷狀態,可以增加讀取效率。
StampedLock
ReentrantReadWriteLock 在沒有任何讀取或寫入鎖定時,才可以取得寫入鎖定,這可用於實現悲觀讀取(Pessimistic Reading),如果執行緒進行讀取時,經常可能有另一執行緒有寫入需求,為了維持資料一致,ReentrantReadWriteLock 的讀取鎖定就可派上用場。
然而,如果讀取執行緒很多,寫入執行緒甚少的情況下,使用 ReentrantReadWriteLock 可能會使得寫入執行緒遭受飢餓(Starvation)問題,也就是寫入執行緒可能遲遲無法競爭到鎖定,而一直處於等待狀態。
JDK8 新增了 StampedLock 類別,可支援樂觀讀取(Optimistic Reading)實作,也就是若讀取執行緒很多,寫入執行緒甚少的情況下,你可以樂觀地認為,寫入與讀取同時發生的機會甚少,因此不悲觀地使用完全的讀取鎖定,程式可以查看資料讀取之後,是否遭到寫入執行緒的變更,再採取後續的措施(重新讀取變更後的資料,或者是拋出例外)。
假設之前的 ArrayList 範例會用於讀取執行緒多而寫入執行緒少的情況,而你想要實作樂觀讀取,如何使用 StampedLock 類別來實現:
package cc.openhome;
import java.util.Arrays;
import java.util.concurrent.locks.*;
public class StampedLockArrayList<E>
private StampedLock lock = new StampedLock();
private Object[] elems;
private int next;
public StampedLockArrayList(int capacity)
elems = new Object[capacity];
public StampedLockArrayList()
this(16);
public void add(E elem)
long stamp = lock.writeLock();
try
if (next == elems.length)
elems = Arrays.copyOf(elems, elems.length * 2);
elems[next++] = elem;
finally
lock.unlockWrite(stamp);
public E get(int index)
long stamp = lock.tryOptimisticRead();
Object elem = elems[index];
if (!lock.validate(stamp))
stamp = lock.readLock();
try
elem = elems[index];
finally
lock.unlockRead(stamp);
return (E) elem;
public int size()
long stamp = lock.tryOptimisticRead();
int size = next;
if (!lock.validate(stamp)如果一个同步方法调用另一个非同步方法,非同步方法是不是有锁