public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class FutureDemo { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); Future<Integer> result = executor.submit(new Callable<Integer>() { public Integer call() throws Exception { Util.delay(); return new Random().nextInt(); } }); doSomeThingElse(); executor.shutdown(); try { try { System.out.println("result:" + result.get(2,TimeUnit.SECONDS)); } catch (TimeoutException e) { // TODO Auto-generated catch block e.printStackTrace(); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } private static void doSomeThingElse() { System.out.println("Do Some Thing Else." ); } }
- 难异步合并
- 等待 Future 集合中的所有任务都完成。
- 仅等待 Future 集合中最快结束的任务完成,并返回它的结果。
一是我们没有好的方法去获取一个完成的任务;二是 Future.get 是阻塞方法,使用不当会造成线程的浪费。解决第一个问题可以用 CompletionService 解决,CompletionService 提供了一个 take() 阻塞方法,用以依次获取所有已完成的任务。对于第二个问题,可以用 Google Guava 库所提供的 ListeningExecutorService 和 ListenableFuture 来解决。这些都会在后面的介绍。
import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; public class Shop { private final String name; private final Random random; public Shop(String name) { this.name = name; random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2)); } public double getPrice(String product) { return calculatePrice(product); } private double calculatePrice(String product) { delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } public Future<Double> getPriceAsync(String product) { CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread( () -> { double price = calculatePrice(product); futurePrice.complete(price); }).start(); return futurePrice; } public String getName() { return name; } }
delay()阻塞,延迟一秒,注释的代码是随机延迟0.5 - 2.5秒之间
public static void delay() { int delay = 1000; //int delay = 500 + RANDOM.nextInt(2000); try { Thread.sleep(delay); } catch (InterruptedException e) { throw new RuntimeException(e); } }
public class ShopSyncMain { public static void main(String[] args) { Shop shop = new Shop("BestShop"); long start = System.nanoTime(); double price = shop.getPrice("my favorite product"); System.out.printf("Price is %.2f%n", price); long invocationTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Invocation returned after " + invocationTime + " msecs"); // Do some more tasks doSomethingElse(); long retrievalTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Price returned after " + retrievalTime + " msecs"); } private static void doSomethingElse() { System.out.println("Doing something else..."); } } //结果: Price is 123.26 Invocation returned after 1069 msecs Doing something else... Price returned after 1069 msecs
public class ShopMain { public static void main(String[] args) { Shop shop = new Shop("BestShop"); long start = System.nanoTime(); Future<Double> futurePrice = shop.getPriceAsync("my favorite product"); long invocationTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Invocation returned after " + invocationTime + " msecs"); // Do some more tasks, like querying other shops doSomethingElse(); // while the price of the product is being calculated try { double price = futurePrice.get(); System.out.printf("Price is %.2f%n", price); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } long retrievalTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Price returned after " + retrievalTime + " msecs"); } private static void doSomethingElse() { System.out.println("Doing something else..."); } }
Invocation returned after 54 msecs
Doing something else...
Price is 123.26
Price returned after 1102 msecs
new Thread方式新建线程可能会有个糟糕的情况:用于提示错误的异常被限制在当前线程内,最终会杀死线程,所以get是无法返回期望值,client调用方会被阻塞。
public Future<Double> getPrice(String product) { new Thread(() -> { try { double price = calculatePrice(product); futurePrice.complete(price); } catch (Exception ex) { futurePrice.completeExceptionally(ex); } }).start();
public Future<Double> getPrice(String product) { return CompletableFuture.supplyAsync(() -> calculatePrice(product)); }
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"), new Shop("LetsSaveBig"), new Shop("MyFavoriteShop"), new Shop("BuyItAll")/*, new Shop("ShopEasy")*/);
//顺序同步方式 public List<String> findPricesSequential(String product) { return shops.stream() .map(shop -> shop.getName() + " price is " + shop.getPrice(product)) .collect(Collectors.toList()); } //并行流方式 public List<String> findPricesParallel(String product) { return shops.parallelStream() .map(shop -> shop.getName() + " price is " + shop.getPrice(product)) .collect(Collectors.toList()); } //工厂方法异步 public List<String> findPricesFuture(String product) { List<CompletableFuture<String>> priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor)) .collect(Collectors.toList()); List<String> prices = priceFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); return prices; } private final Executor executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } });
sequential done in 4130 msecs
parallel done in 1091 msecs
composed CompletableFuture done in 1010 msecs
sequential done in 5032 msecs
parallel done in 2009 msecs
CompletableFuture异步方式能保持1秒的秘密在于线程池,我们定义了shops.size()大小的线程池,并且使用了守护线程。java提供了俩类的线程:用户线程和守护线程(user thread and Daemon thread)。用户线程是高优先级的线程。JVM虚拟机在结束一个用户线程之前,会先等待该用户线程完成它的task。
public class Discount { public enum Code { NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20); private final int percentage; Code(int percentage) { this.percentage = percentage; } } public static String applyDiscount(Quote quote) { return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode()); } private static double apply(double price, Code code) { delay(); return format(price * (100 - code.percentage) / 100); } } public class Quote { private final String shopName; private final double price; private final Discount.Code discountCode; public Quote(String shopName, double price, Discount.Code discountCode) { this.shopName = shopName; this.price = price; this.discountCode = discountCode; } public static Quote parse(String s) { String[] split = s.split(":"); String shopName = split[0]; double price = Double.parseDouble(split[1]); Discount.Code discountCode = Discount.Code.valueOf(split[2]); return new Quote(shopName, price, discountCode); } public String getShopName() { return shopName; } public double getPrice() { return price; } public Discount.Code getDiscountCode() { return discountCode; } }
比如,SILVER(5)代表95折, GOLD(10)代表90折,PLATINUM(15)代表85折。、
public class Shop { private final String name; private final Random random; public Shop(String name) { this.name = name; random = new Random(name.charAt(0) * name.charAt(1) * name.charAt(2)); } public String getPrice(String product) { double price = calculatePrice(product); Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)]; return name + ":" + price + ":" + code; } public double calculatePrice(String product) { delay(); return format(random.nextDouble() * product.charAt(0) + product.charAt(1)); } public String getName() { return name; } }
public class BestPriceFinder { private final List<Shop> shops = Arrays.asList(new Shop("BestPrice"), new Shop("LetsSaveBig"), new Shop("MyFavoriteShop"), new Shop("BuyItAll"), new Shop("ShopEasy")); private final Executor executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); //同步流 public List<String> findPricesSequential(String product) { return shops.stream() .map(shop -> shop.getPrice(product)) .map(Quote::parse) .map(Discount::applyDiscount) .collect(Collectors.toList()); } //并行流 public List<String> findPricesParallel(String product) { return shops.parallelStream() .map(shop -> shop.getPrice(product)) .map(Quote::parse) .map(Discount::applyDiscount) .collect(Collectors.toList()); } //CompletableFuture异步 public List<String> findPricesFuture(String product) { List<CompletableFuture<String>> priceFutures = findPricesStream(product) .collect(Collectors.<CompletableFuture<String>>toList()); return priceFutures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); } public Stream<CompletableFuture<String>> findPricesStream(String product) { return shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor)) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor))); } public void printPricesStream(String product) { long start = System.nanoTime(); CompletableFuture[] futures = findPricesStream(product) .map(f -> f.thenAccept(s -> System.out.println(s + " (done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs)"))) .toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(futures).join(); System.out.println("All shops have now responded in " + ((System.nanoTime() - start) / 1_000_000) + " msecs"); } } //结果 sequential done in 10176 msecs parallel done in 4010 msecs composed CompletableFuture done in 2016 msecs
- 同步的10多秒是在5个list元素时获得的,每个一次询价,一次打折延迟,共2秒。
- 并行流一轮需要2秒多(第一轮询价4个+1个共两个轮次,2秒,第二轮打折4个+1个)。
- 异步方式第一轮和第二轮都是1秒多,所以2秒多。
- 询价任务(耗时1秒)
- 解析字符换成Quote实例
- 计算折扣
- .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
- .map(future -> future.thenApply(Quote::parse))
- .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
thenapply()是接受一个Function<? super T,? extends U> fn参数用来转换CompletableFuture,相当于流的map操作,返回的是非CompletableFuture类型,它的功能相当于将CompletableFuture<T>转换成CompletableFuture<U>.
第三行也是异步处理,也涉及线程池,获得折扣后的价格。thenCompose()在异步操作完成的时候对异步操作的结果进行一些操作,Function<? super T, ? extends CompletionStage<U>> fn参数,并且仍然返回CompletableFuture类型,相当于flatMap,用来连接两个CompletableFuture
public class ExchangeService { public enum Money { USD(1.0), EUR(1.35387), GBP(1.69715), CAD(.92106), MXN(.07683); private final double rate; Money(double rate) { this.rate = rate; } } public static double getRate(Money source, Money destination) { return getRateWithDelay(source, destination); } private static double getRateWithDelay(Money source, Money destination) { delay(); return destination.rate / source.rate; } } public List<String> findPricesInUSD(String product) { List<CompletableFuture<Double>> priceFutures = new ArrayList<>(); for (Shop shop : shops) { // Start of Listing 10.20. // Only the type of futurePriceInUSD has been changed to // CompletableFuture so that it is compatible with the // CompletableFuture::join operation below. CompletableFuture<Double> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product)) .thenCombine( CompletableFuture.supplyAsync( () -> ExchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate ); priceFutures.add(futurePriceInUSD); } // Drawback: The shop is not accessible anymore outside the loop, // so the getName() call below has been commented out. List<String> prices = priceFutures .stream() .map(CompletableFuture::join) .map(price -> /*shop.getName() +*/ " price is " + price) .collect(Collectors.toList()); return prices; } public List<String> findPricesInUSDJava7(String product) { ExecutorService executor = Executors.newCachedThreadPool(); List<Future<Double>> priceFutures = new ArrayList<>(); for (Shop shop : shops) { final Future<Double> futureRate = executor.submit(new Callable<Double>() { public Double call() { return ExchangeService.getRate(Money.EUR, Money.USD); } }); Future<Double> futurePriceInUSD = executor.submit(new Callable<Double>() { public Double call() { try { double priceInEUR = shop.getPrice(product); return priceInEUR * futureRate.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e.getMessage(), e); } } }); priceFutures.add(futurePriceInUSD); } List<String> prices = new ArrayList<>(); for (Future<Double> priceFuture : priceFutures) { try { prices.add(/*shop.getName() +*/ " price is " + priceFuture.get()); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } } return prices; } public List<String> findPricesInUSD2(String product) { List<CompletableFuture<String>> priceFutures = new ArrayList<>(); for (Shop shop : shops) { // Here, an extra operation has been added so that the shop name // is retrieved within the loop. As a result, we now deal with // CompletableFuture<String> instances. CompletableFuture<String> futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product)) .thenCombine( CompletableFuture.supplyAsync( () -> ExchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate ).thenApply(price -> shop.getName() + " price is " + price); priceFutures.add(futurePriceInUSD); } List<String> prices = priceFutures .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); return prices; } public List<String> findPricesInUSD3(String product) { // Here, the for loop has been replaced by a mapping function... Stream<CompletableFuture<String>> priceFuturesStream = shops .stream() .map(shop -> CompletableFuture .supplyAsync(() -> shop.getPrice(product)) .thenCombine( CompletableFuture.supplyAsync(() -> ExchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate) .thenApply(price -> shop.getName() + " price is " + price)); // However, we should gather the CompletableFutures into a List so that the asynchronous // operations are triggered before being "joined." List<CompletableFuture<String>> priceFutures = priceFuturesStream.collect(Collectors.toList()); List<String> prices = priceFutures .stream() .map(CompletableFuture::join) .collect(Collectors.toList()); return prices; }