借助JDK8和RxJava如何让你的业务代码跑的更快
Posted Sumslack团队
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了借助JDK8和RxJava如何让你的业务代码跑的更快相关的知识,希望对你有一定的参考价值。
~ 篇篇值得收藏的原创技术好文 ~
背景
微服务流行后,在我们项目开发过程中,一个服务经常会调用N个微服务,调用每个微服务可能需要几百毫秒,试想,一个复杂的业务如果要调用上百的微服务,如果各个服务同步执行,可能就需要花费好几秒,试想:这些服务为什么不能并行运行呢?
一个复杂的计算任务,为什么不能分解成更小的任务单位,让他们并行运行呢?
本文通过以上两个业务场景,比较各个实现方案的差异,在讲解之前,我们先来了解下本文提到的RxJava
。
案例
从一段最简单的服务开始:该服务需调用3个微服务,每个微服务费时250ms,三个微服务都获取数据后返回给前端(该微服务三个服务分别是商品详情,商品评论和推荐商品列表),如果按顺序执行,那么代码是这样的:
1public static void main(String[] args) throws Exception {
2 long c = System.currentTimeMillis();
3 System.out.println("顺序执行:");
4 System.out.println(service("商品详情微服务")+service("商品评论微服务")+service("推荐商品微服务"));
5 spendTime(c);
6}
7//模拟某个服务
8private static String service(String srvName){
9 try {
10 Thread.sleep(250);
11 } catch (InterruptedException e) {
12 e.printStackTrace();
13 }
14 return srvName+"\r\n";
15}
16private static void spendTime(long preTime) {
17 System.out.println("花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
18}
这段代码毫无疑问,打印输出:
花费:781 毫秒
改造一下,使用JDK8的CompletableFuture
,3个微服务独立线程运行,都完成后通知主线程打印,代码如下:
1public static void main(String[] args) throws Exception {
2 final long cc = System.currentTimeMillis();
3 CompletableFuture<String> s1 = CompletableFuture.supplyAsync(() -> service("商品详情微服务"));
4 CompletableFuture<String> s2 = CompletableFuture.supplyAsync(() -> service("商品评论微服务"));
5 CompletableFuture<String> s3 = CompletableFuture.supplyAsync(() -> service("推荐商品微服务"));
6 s1.thenCombine(s2, (i,j)->{
7 return i+j;
8 }).thenCombine(s3, (i,j)->{
9 System.out.println("使用JDK8的并行编程:");
10 System.out.println(i+j);
11 spendTime(cc);
12 return i+j;
13 });
14}
以上代码的执行结果取决于3个微服务中最长时间的那个服务,相比原先速度有明显提高:
花费:311 毫秒
那么以上的代码使用RxJava怎么来写呢?我们可以flatMap
将服务分拆到各自独立线程中去执行,代码如下:
1private static String[] ss = {"商品详情微服务","商品评论微服务","推荐商品微服务"};
2public static void main(String[] args) throws Exception {
3 Observable.range(0,3)
4 .flatMap(new Function<Integer, ObservableSource<String>>() {
5 @Override
6 public ObservableSource<String> apply(Integer t) throws Exception {
7 return Observable.just(t)
8 .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
9 .map(new Function<Integer, String>() {
10 @Override
11 public String apply(Integer t) throws Exception {
12 return service(ss[t]);
13 }
14 });
15 }
16 })
17 .reduce((s1,s2)->s1+s2)
18 .subscribe(s -> {
19 System.out.println("Observable:\r\n" + s);
20 spendTime(cc2);
21 });
22}
花费:455 毫秒
RxJava模拟的针对每个数据项的并发操作调用时间上要比直接使用JDK8的API慢得多
第二个业务场景是将复杂的计算进行拆分子计算任务,然后将每个任务计算合并成最终计算结果,以下直接给出所有源码,我们来看看几种计算方式在耗时上的不同,复杂计算任务是:对1到210000000开根号求总和
1package com.sumslack.rxjava;
2
3import java.util.Arrays;
4import java.util.concurrent.CompletableFuture;
5import java.util.concurrent.ExecutorService;
6import java.util.concurrent.Executors;
7
8import io.reactivex.Observable;
9import io.reactivex.ObservableSource;
10import io.reactivex.functions.BiFunction;
11import io.reactivex.functions.Function;
12import io.reactivex.schedulers.Schedulers;
13
14public class TestComputer {
15 private static final int MAX_I = 210000000;
16
17 private static void spendTime(long preTime) {
18 System.out.println("花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
19 }
20
21 private static void spendTime(long preTime,String str) {
22 System.out.println("[" + str + "] 花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
23 }
24 private static ExecutorService eService = Executors.newCachedThreadPool();
25 public static void main(String[] args) throws Exception{
26
27 int[] ss = new int[MAX_I];
28 for(int i=1;i<=MAX_I;i++) {
29 ss[i-1] = i;
30 }
31
32
33 long c = System.currentTimeMillis();
34 System.out.println(xx(0,MAX_I));
35 spendTime(c,"顺序执行");
36
37 final long cc5 = System.currentTimeMillis();
38 Observable.range(1, MAX_I).map(new Function<Integer, Double>() {
39 @Override
40 public Double apply(Integer t) throws Exception {
41 return Math.sqrt(t);
42 }
43 }).reduce((i,j)->i+j)
44 .subscribeOn(Schedulers.computation())
45 .subscribe(s -> {
46 spendTime(cc5,"Observable直接算");
47 });
48 final long cc = System.currentTimeMillis();
49 CompletableFuture<Double> cf1 = CompletableFuture.supplyAsync(() -> {
50 return xx(0,MAX_I/2);
51 });
52 CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
53 return xx(MAX_I/2,MAX_I);
54 });
55 cf1.thenCombine(cf2, (i,j)->{
56 System.out.println(""+(i+j));
57 spendTime(cc,"CompletableFuture");
58 return i+j;
59 });
60
61 //也可以用:CompletableFuture.allOf(cf1,cf2).join();
62 c = System.currentTimeMillis();
63 Double dd = Arrays.stream(ss).mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
64 System.out.println(dd);
65 spendTime(cc,"stream");
66
67 c = System.currentTimeMillis();
68 Double dd2 = Arrays.stream(ss).parallel().mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
69 System.out.println(dd2);
70 spendTime(cc,"parallel stream");
71
72 final long cc2 = System.currentTimeMillis();
73 Observable.fromArray(0,1,2)
74 .flatMap(new io.reactivex.functions.Function<Integer,ObservableSource<Double>>(){
75 @Override
76 public ObservableSource<Double> apply(Integer t) throws Exception {
77 if(t%3==0) {
78 return Observable.just(t)
79 .subscribeOn(Schedulers.computation())
80 .map(new Function<Integer, Double>() {
81 @Override
82 public Double apply(Integer t) throws Exception {
83 return xx(0,MAX_I/3);
84 }
85 });
86 }else if(t%3==1) {
87 return Observable.just(t)
88 .subscribeOn(Schedulers.computation())
89 .map(new Function<Integer, Double>() {
90 @Override
91 public Double apply(Integer t) throws Exception {
92 return xx(MAX_I/3,MAX_I*2/3);
93 }
94 });
95 }else {
96 return Observable.just(t)
97 .subscribeOn(Schedulers.computation())
98 .map(new Function<Integer, Double>() {
99 @Override
100 public Double apply(Integer t) throws Exception {
101 return xx(MAX_I*2/3,MAX_I);
102 }
103 });
104 }
105 }
106 })
107 .reduce(new BiFunction<Double, Double, Double>() {
108 @Override
109 public Double apply(Double t1, Double t2) throws Exception {
110 return t1+t2;
111 }
112 })
113 .subscribe( s->{
114 System.out.println(s);
115 spendTime(cc2,"Observable");
116 });
117 Thread.sleep(100000);
118 }
119
120 private static double xx(int start,int end) {
121 double sum = 1;
122 for(int i=start;i<end;i++) {
123 sum += Math.sqrt(i+1);
124 }
125 return sum;
126 }
127}
以下是费时结果:
[顺序执行] 花费:1086 毫秒
[CompletableFuture] 花费:537 毫秒
[stream] 花费:1028 毫秒
[parallel stream] 花费:1305 毫秒
[Observable] 花费:461 毫秒
[Observable直接算] 花费:4265 毫秒
这里使用 RxJava
进行计算任务分解求和是最快的,因为JDK8并发编程我们分解的是两个计算任务,而RxJava分解成3个所致!
关于RxJava
RxJava
是 Reactive Extensions
的Java
实现,通过使用Obserable
/Flowable
序列来构建异步和基于事件的程序的库,RxJava实现和扩展了观察者模式。
RxJava
基于响应式编程,是一种面向数据流和变化传播的编程范式。传统编程方式代码都是顺序执行的,而响应式编程是基于异步编程的,借助于CPU多核能力,提高运行效率,降低延迟和阻塞,基于数据流模型,如一个函数可作用与数据流中的每项,可变化传播。在响应式编程中,函数成为其第一等公民,同原型类型一样,函数可作用与参数,也可作为返回值。
RxJava
基于函数式编程,传统面向对象是通过抽象出对象关系来解决问题,函数式编程是通过函数的组合来解决问题。
概念
Observable
:被订阅者,比如在安卓开发中,可能是某个数据源,数据源的变化要通知到UI,那么UI就是Observer,被订阅者有冷热之分,热Observable无论有没有订阅者订阅,事件流始终发送,而冷Observable则只有订阅者订阅事件流才开始发送数据,它们之间是可以通过API相互转化的,比如使用publish
可以冷->热,RefCount
可以热->冷;Observer
:订阅者;
RxJava编程
被订阅者:用的做多的是
Observable
,如果要支持背压则使用Flowable
,还可以使用Single
(只要OnSuccess和onError,没有onComplete),Completable
(创建后不发射任何数据,只有onComplete和onError)和Maybe
(只发送0或1个数据);生命周期监听:Observable创建后可使用doXXX监听你说需要的生命周期回调;
流的创建:create(使用一个函数从头创建),just(指定值创建,最多10个),fromXXX(基于X类创建),repeat(特定数据重复N次创建),defer(直到有订阅者订阅时才创建),interval(每隔一段时间创建一个数据发送),timer(延迟一段时间后发送数据);
RxJava线程模型: 内置多个线程控制器,包括single(定长为1的线程池),newThread(启动新线程执行),computation(大小为CPU核数线程池,一般用于密集型计算),io(适用IO操作),trampoline(直接在当前线程运行)和Schedulers.from(自定义);
变化操作符:map(数据转型),flatMap(数据转某个Observable后合并发送),scan(每个数据应用一个函数,然后按顺序发送),groupBy(按Key分组拆分成多个Observable),buffer(打包发送),window,cast(强制转换类型);
过滤操作:filter(按条件过滤),takeLast(只发送最后N个数据),last(只发送最后一个数据),lastOrDefault(只发送最后一个数据,为Null发送默认值),takeLastBuffer(将最后N个数据当做单个数据发送),skip(跳过N个发送),skipLast(跳过最后N个),take(只发送开始的N个数据),first,takeFirst(只发送满足条件的第一个数据),elementAt(只发送第N个数据),timeout(指定事件内没发送数据,就发送异常),distinct(去重),ofType(只发送特定类型的数据),ignoreElements(丢失所有正常数据,只发送错误或完成通知),sample(一段时间内,只处理最后一个数据),throttleFirst(一段时间内,只处理第一个数据),debounce(发送一个数据,开始计时,到了规定时间没有再发送数据,则开始处理数据);
条件操作和布尔操作符:all(发送的数据是否都满足条件),contains(发送的数据是否包含某数据),amb(多个被订阅者数据发送只发送首次被订阅的那个数据流),defaultIfEmpty(如果原始被订阅者没有值,则发送一个默认值),sequenceEquals(判定两个数据流是否一样,返回true或false),skipUtil(直到符合条件才发送),skipWhile(直到条件不符合才开始发送),takeUntil(满足条件后不发送)和takeWhile(条件满足的一直发送);
合并和连接操作符:merge(将多个被订阅数据流合并),zip(将多个数据流结合发送,返回数据流的数据个数是最少的那个),combineLastest(类似zip,任意被订阅者开始发送数据时即发送,而zip要每个被订阅者开始发送数据才发送),join(两个被订阅者结合合并,总数据项是M*N项),startWith(在数据序列开头插入指定项),connect,灵活控制发送数据规则可使用push,refCount,replay(保证所有订阅者收到相同数据);
背压:被订阅者发送数据过快以至于订阅者来不及处理的情况;
总结
对于复杂计算,你可以将计算任务分解成N个子计算任务,交给多个线程处理并将结果合并后取得最终结果,对于服务业务的调用,你应该清楚,哪些子任务可以并行运行,哪些需要顺序执行,使用RxJava在代码上可能更加直观,也可以使用JDK8的CompletableFuture,其实JDK8的很多API参考了RxJava的实现,两者在写法上非常的类似,响应式编程相比传统代码的顺序执行在思路上有很大的不同,理解上也有一定的难度,希望通过本文让您全面了解函数式编程的实现思路。
你可以继续阅读:
|| | | | | | | |
以上是关于借助JDK8和RxJava如何让你的业务代码跑的更快的主要内容,如果未能解决你的问题,请参考以下文章