借助JDK8和RxJava如何让你的业务代码跑的更快

Posted Sumslack团队

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了借助JDK8和RxJava如何让你的业务代码跑的更快相关的知识,希望对你有一定的参考价值。

~ 篇篇值得收藏的原创技术好文 ~

背景

微服务流行后,在我们项目开发过程中,一个服务经常会调用N个微服务,调用每个微服务可能需要几百毫秒,试想,一个复杂的业务如果要调用上百的微服务,如果各个服务同步执行,可能就需要花费好几秒,试想:这些服务为什么不能并行运行呢?

一个复杂的计算任务,为什么不能分解成更小的任务单位,让他们并行运行呢?

本文通过以上两个业务场景,比较各个实现方案的差异,在讲解之前,我们先来了解下本文提到的RxJava

案例

从一段最简单的服务开始:该服务需调用3个微服务,每个微服务费时250ms,三个微服务都获取数据后返回给前端(该微服务三个服务分别是商品详情,商品评论和推荐商品列表),如果按顺序执行,那么代码是这样的:

 1public static void main(String[] args) throws Exception {
2    long c = System.currentTimeMillis();
3    System.out.println("顺序执行:");
4    System.out.println(service("商品详情微服务")+service("商品评论微服务")+service("推荐商品微服务"));
5    spendTime(c);
6}
7//模拟某个服务
8private static String service(String srvName){
9    try {
10        Thread.sleep(250);
11    } catch (InterruptedException e) {
12        e.printStackTrace();
13    }
14    return srvName+"\r\n";
15}
16private static void spendTime(long preTime{
17    System.out.println("花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
18}

这段代码毫无疑问,打印输出:

花费:781 毫秒

改造一下,使用JDK8的CompletableFuture,3个微服务独立线程运行,都完成后通知主线程打印,代码如下:

 1public static void main(String[] args) throws Exception {
2        final long cc = System.currentTimeMillis();    
3    CompletableFuture<String> s1 = CompletableFuture.supplyAsync(() -> service("商品详情微服务"));
4    CompletableFuture<String> s2 = CompletableFuture.supplyAsync(() -> service("商品评论微服务"));
5    CompletableFuture<String> s3 = CompletableFuture.supplyAsync(() -> service("推荐商品微服务"));
6    s1.thenCombine(s2, (i,j)->{
7        return i+j;
8    }).thenCombine(s3, (i,j)->{
9        System.out.println("使用JDK8的并行编程:");
10        System.out.println(i+j);
11        spendTime(cc);
12        return i+j;
13    });
14}

以上代码的执行结果取决于3个微服务中最长时间的那个服务,相比原先速度有明显提高:

花费:311 毫秒

那么以上的代码使用RxJava怎么来写呢?我们可以flatMap将服务分拆到各自独立线程中去执行,代码如下:

 1private static String[] ss = {"商品详情微服务","商品评论微服务","推荐商品微服务"};
2public static void main(String[] args) throws Exception {
3    Observable.range(0,3)
4    .flatMap(new Function<IntegerObservableSource<String>>() {
5        @Override
6        public ObservableSource<String> apply(Integer t) throws Exception {
7            return Observable.just(t)
8        .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
9                .map(new Function<IntegerString>() {
10                    @Override
11                    public String apply(Integer t) throws Exception {
12                        return service(ss[t]);
13                    }
14                });
15            }
16        })
17        .reduce((s1,s2)->s1+s2)
18        .subscribe(s -> {
19            System.out.println("Observable:\r\n" + s);
20            spendTime(cc2);
21        });
22}

花费:455 毫秒

RxJava模拟的针对每个数据项的并发操作调用时间上要比直接使用JDK8的API慢得多

第二个业务场景是将复杂的计算进行拆分子计算任务,然后将每个任务计算合并成最终计算结果,以下直接给出所有源码,我们来看看几种计算方式在耗时上的不同,复杂计算任务是:对1到210000000开根号求总和

  1package com.sumslack.rxjava;
 2
 3import java.util.Arrays;
 4import java.util.concurrent.CompletableFuture;
 5import java.util.concurrent.ExecutorService;
 6import java.util.concurrent.Executors;
 7
 8import io.reactivex.Observable;
 9import io.reactivex.ObservableSource;
10import io.reactivex.functions.BiFunction;
11import io.reactivex.functions.Function;
12import io.reactivex.schedulers.Schedulers;
13
14public class TestComputer {
15    private static final int MAX_I = 210000000;
16
17    private static void spendTime(long preTime) {
18        System.out.println("花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
19    }
20
21    private static void spendTime(long preTime,String str) {
22        System.out.println("[" + str + "] 花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
23    }
24    private static ExecutorService eService = Executors.newCachedThreadPool();
25    public static void main(String[] args) throws Exception{
26
27        int[] ss = new int[MAX_I];
28        for(int i=1;i<=MAX_I;i++) {
29            ss[i-1] = i;
30        }
31
32
33        long c = System.currentTimeMillis();
34        System.out.println(xx(0,MAX_I));
35        spendTime(c,"顺序执行");
36
37        final long cc5 = System.currentTimeMillis();
38        Observable.range(1, MAX_I).map(new Function<Integer, Double>() {
39            @Override
40            public Double apply(Integer t) throws Exception {
41                return Math.sqrt(t);
42            }
43        }).reduce((i,j)->i+j)
44        .subscribeOn(Schedulers.computation())
45        .subscribe(s -> {
46            spendTime(cc5,"Observable直接算");
47        });
48        final long cc = System.currentTimeMillis();
49        CompletableFuture<Double> cf1 = CompletableFuture.supplyAsync(() -> {
50            return xx(0,MAX_I/2);
51        });
52        CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
53            return xx(MAX_I/2,MAX_I);
54        });
55        cf1.thenCombine(cf2,  (i,j)->{
56            System.out.println(""+(i+j));
57            spendTime(cc,"CompletableFuture");
58            return i+j;
59        });
60
61        //也可以用:CompletableFuture.allOf(cf1,cf2).join();
62        c = System.currentTimeMillis();
63        Double dd = Arrays.stream(ss).mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
64        System.out.println(dd);
65        spendTime(cc,"stream");
66
67        c = System.currentTimeMillis();
68        Double dd2 = Arrays.stream(ss).parallel().mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
69        System.out.println(dd2);
70        spendTime(cc,"parallel stream");
71
72        final long cc2 = System.currentTimeMillis();
73        Observable.fromArray(0,1,2)
74        .flatMap(new io.reactivex.functions.Function<Integer,ObservableSource<Double>>(){
75            @Override
76            public ObservableSource<Double> apply(Integer t) throws Exception {
77                if(t%3==0) {
78                    return Observable.just(t)
79                        .subscribeOn(Schedulers.computation())
80                        .map(new Function<Integer, Double>() {
81                            @Override
82                            public Double apply(Integer t) throws Exception {
83                                return xx(0,MAX_I/3);
84                            }
85                        });
86                }else if(t%3==1) {
87                    return Observable.just(t)
88                            .subscribeOn(Schedulers.computation())
89                            .map(new Function<Integer, Double>() {
90                                @Override
91                                public Double apply(Integer t) throws Exception {
92                                    return xx(MAX_I/3,MAX_I*2/3);
93                                }
94                            });
95                }else {
96                    return Observable.just(t)
97                            .subscribeOn(Schedulers.computation())
98                            .map(new Function<Integer, Double>() {
99                                @Override
100                                public Double apply(Integer t) throws Exception {
101                                    return xx(MAX_I*2/3,MAX_I);
102                                }
103                            });
104                }
105            }
106        })
107        .reduce(new BiFunction<Double, Double, Double>() {
108            @Override
109            public Double apply(Double t1, Double t2) throws Exception {
110                return t1+t2;
111            }
112        })
113        .subscribe( s->{
114            System.out.println(s);
115            spendTime(cc2,"Observable");
116        });
117        Thread.sleep(100000);
118    }
119
120    private static double xx(int start,int end) {
121        double sum = 1;
122        for(int i=start;i<end;i++) {
123            sum += Math.sqrt(i+1);
124        }
125        return sum;
126    }
127}

以下是费时结果:

[顺序执行] 花费:1086 毫秒
[CompletableFuture] 花费:537 毫秒
[stream] 花费:1028 毫秒
[parallel stream] 花费:1305 毫秒
[Observable] 花费:461 毫秒
[Observable直接算] 花费:4265 毫秒

这里使用 RxJava 进行计算任务分解求和是最快的,因为JDK8并发编程我们分解的是两个计算任务,而RxJava分解成3个所致!

关于RxJava

RxJavaReactive ExtensionsJava实现,通过使用Obserable/Flowable序列来构建异步和基于事件的程序的库,RxJava实现和扩展了观察者模式。

RxJava基于响应式编程,是一种面向数据流和变化传播的编程范式。传统编程方式代码都是顺序执行的,而响应式编程是基于异步编程的,借助于CPU多核能力,提高运行效率,降低延迟和阻塞,基于数据流模型,如一个函数可作用与数据流中的每项,可变化传播。在响应式编程中,函数成为其第一等公民,同原型类型一样,函数可作用与参数,也可作为返回值。

RxJava基于函数式编程,传统面向对象是通过抽象出对象关系来解决问题,函数式编程是通过函数的组合来解决问题。

概念

  • Observable:被订阅者,比如在安卓开发中,可能是某个数据源,数据源的变化要通知到UI,那么UI就是Observer,被订阅者有冷热之分,热Observable无论有没有订阅者订阅,事件流始终发送,而冷Observable则只有订阅者订阅事件流才开始发送数据,它们之间是可以通过API相互转化的,比如使用publish可以冷->热,RefCount可以热->冷;

  • Observer:订阅者;

RxJava编程

  • 被订阅者:用的做多的是Observable,如果要支持背压则使用Flowable,还可以使用Single(只要OnSuccess和onError,没有onComplete),Completable(创建后不发射任何数据,只有onComplete和onError)和Maybe(只发送0或1个数据);

  • 生命周期监听:Observable创建后可使用doXXX监听你说需要的生命周期回调;

  • 流的创建:create(使用一个函数从头创建),just(指定值创建,最多10个),fromXXX(基于X类创建),repeat(特定数据重复N次创建),defer(直到有订阅者订阅时才创建),interval(每隔一段时间创建一个数据发送),timer(延迟一段时间后发送数据);

  • RxJava线程模型: 内置多个线程控制器,包括single(定长为1的线程池),newThread(启动新线程执行),computation(大小为CPU核数线程池,一般用于密集型计算),io(适用IO操作),trampoline(直接在当前线程运行)和Schedulers.from(自定义);

  • 变化操作符:map(数据转型),flatMap(数据转某个Observable后合并发送),scan(每个数据应用一个函数,然后按顺序发送),groupBy(按Key分组拆分成多个Observable),buffer(打包发送),window,cast(强制转换类型);

  • 过滤操作:filter(按条件过滤),takeLast(只发送最后N个数据),last(只发送最后一个数据),lastOrDefault(只发送最后一个数据,为Null发送默认值),takeLastBuffer(将最后N个数据当做单个数据发送),skip(跳过N个发送),skipLast(跳过最后N个),take(只发送开始的N个数据),first,takeFirst(只发送满足条件的第一个数据),elementAt(只发送第N个数据),timeout(指定事件内没发送数据,就发送异常),distinct(去重),ofType(只发送特定类型的数据),ignoreElements(丢失所有正常数据,只发送错误或完成通知),sample(一段时间内,只处理最后一个数据),throttleFirst(一段时间内,只处理第一个数据),debounce(发送一个数据,开始计时,到了规定时间没有再发送数据,则开始处理数据);

  • 条件操作和布尔操作符:all(发送的数据是否都满足条件),contains(发送的数据是否包含某数据),amb(多个被订阅者数据发送只发送首次被订阅的那个数据流),defaultIfEmpty(如果原始被订阅者没有值,则发送一个默认值),sequenceEquals(判定两个数据流是否一样,返回true或false),skipUtil(直到符合条件才发送),skipWhile(直到条件不符合才开始发送),takeUntil(满足条件后不发送)和takeWhile(条件满足的一直发送);

  • 合并和连接操作符:merge(将多个被订阅数据流合并),zip(将多个数据流结合发送,返回数据流的数据个数是最少的那个),combineLastest(类似zip,任意被订阅者开始发送数据时即发送,而zip要每个被订阅者开始发送数据才发送),join(两个被订阅者结合合并,总数据项是M*N项),startWith(在数据序列开头插入指定项),connect,灵活控制发送数据规则可使用push,refCount,replay(保证所有订阅者收到相同数据);

  • 背压:被订阅者发送数据过快以至于订阅者来不及处理的情况;

总结

对于复杂计算,你可以将计算任务分解成N个子计算任务,交给多个线程处理并将结果合并后取得最终结果,对于服务业务的调用,你应该清楚,哪些子任务可以并行运行,哪些需要顺序执行,使用RxJava在代码上可能更加直观,也可以使用JDK8的CompletableFuture,其实JDK8的很多API参考了RxJava的实现,两者在写法上非常的类似,响应式编程相比传统代码的顺序执行在思路上有很大的不同,理解上也有一定的难度,希望通过本文让您全面了解函数式编程的实现思路。


你可以继续阅读:


|| |  |  |  |  |  |  | 



以上是关于借助JDK8和RxJava如何让你的业务代码跑的更快的主要内容,如果未能解决你的问题,请参考以下文章

原创专栏RxJava源代码剖析

RxJava

一文掌握stream,让你的代码提高一个境界

FireFox浏览器的下载和安装借助RamDisk让你的FireFox飞起来

程序员的面试现场:如何让你的回答更到位?

JDK8新特性:使用Optional:解决NPE问题的更干净的写法