java 列表输入到Completable future join结果

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 列表输入到Completable future join结果相关的知识,希望对你有一定的参考价值。

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;


public class CompletableFutureTest {

    public static void main(String[] args) {
        List<Shop> shops = Arrays.asList(
                new Shop("New Save"),
                new Shop("Pak n Save"),
                new Shop("New World"),
                new Shop("PB"),
                new Shop("Norman"),
                new Shop("Countdown")
        );

        long start = System.nanoTime();
        System.out.println(findPricesAsyn("IPhone7", shops));
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Done in completable future " + duration + " msecs");


        start = System.nanoTime();
        System.out.println(findPricesSync("IPhone6", shops));
        duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("Done in parallel stream " + duration + " msecs");

    }


    /**
     * Note that you use two separate stream pipelines,
     * instead of putting the two map operations one after the other in the same stream-processing pipeline—and for a very good reason.
     * Given the lazy nature of intermediate stream operations,
     * if you had processed the stream in a single pipeline, you would have succeeded only in executing all the requests to different shops synchronously and sequentially.
     * This is because the creation of each CompletableFuture to interrogate a given shop would start only when the computation of the previous one had completed,
     * letting the join method return the result of that computation.
     *
     * @param product
     * @param shops
     * @return
     */
    public static List<String> findPricesAsyn(String product, List<Shop> shops) {

//      more flexible than parallel as it allows you to specify a different Executor to submit their tasks to
        List<CompletableFuture<String>> priceFutures = shops.stream()
                .map(shop -> CompletableFuture.supplyAsync(
                        () -> shop.getPrice(product)))
                .map(future -> future.thenApply(Quote::parse))
                .map(future -> future.thenCompose(quote ->
                        CompletableFuture.supplyAsync(() -> //create another future to be composed...
                                Discount.applyDiscount(quote))))
                .collect(Collectors.toList());

        return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
    }


    public static List<String> findPricesSync(String product, List<Shop> shops) {
        return shops.stream()
                .map(shop -> shop.getPrice(product))
                .map(Quote::parse)
                .map(Discount::applyDiscount)
                .collect(Collectors.toList());
    }


//    public static List<String> findPricesParall(String product, List<Shop> shops) {
//        return shops.parallelStream()
//                .map(shop -> shop.getPrice(product))
//                .collect(Collectors.toList());
//    }

}


class Shop {

    private String name;

    Random random = new Random();

    public Shop(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public static void delay() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private double calculatePrice(String product) {
        delay();
        return random.nextDouble() * product.charAt(0) + product.charAt(1);
    }

    public String getPrice(String product) {
        double price = calculatePrice(product);
        Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
        return String.format("%s:%.2f:%s", name, price, code);
    }

    public Future<Double> getPriceAsync(String product) {
        return CompletableFuture.supplyAsync(() -> calculatePrice(product));
    }
}

class Quote {
    private final String shopName;
    private double price;
    private final Discount.Code discountCode;


    public Quote(String shopName, double price, Discount.Code discountCode) {
        this.shopName = shopName;
        this.price = price;
        this.discountCode = discountCode;
    }

    public static Quote parse(String s) {
        String[] split = s.split(":");
        String shopName = split[0];
        double price = Double.parseDouble(split[1]);
        Discount.Code discountCode = Discount.Code.valueOf(split[2]);
        return new Quote(shopName, price, discountCode);
    }

    public String getShopName() {
        return shopName;
    }

    public double getPrice() {
        return price;
    }

    public Discount.Code getDiscountCode() {
        return discountCode;
    }
}

class Discount {
    public enum Code {
        A(10), B(20), C(30);
        int percentage;

        Code(int percentage) {
            this.percentage = percentage;
        }

        int getPercentage() {
            return percentage;
        }

    }

    public static String applyDiscount(Quote quote) {
        return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
    }

    public static void delay() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static double apply(double price, Code discountCode) {
        delay();
        return price * (100 - discountCode.getPercentage() / 100);
    }
}

以上是关于java 列表输入到Completable future join结果的主要内容,如果未能解决你的问题,请参考以下文章

Java 8 Completable Futures 所有不同的数据类型

RxSwift: 链 Completable 到 Observable

如何在 RxJava2 中链接两个 Completable

RXJAVA-Completable

RXJAVA-Completable

等待 Completable 未来线程完成的推荐方法是啥