ForkJoin使用

Posted tenwood

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ForkJoin使用相关的知识,希望对你有一定的参考价值。

一、Fork Join    
分而治之的办法

JDk为Fork/Join框架提供了很好的支持,我们想要用这个算法首先得创建一个Fork/Join任务,在JDK中这个任务就叫做:ForJoinTask,只要继承这个类就可以创建一个任务类,但是实际使用中并不是直接继承ForkJoinTask类,而是继承它的子类,它有两个子类,分别是RecursiveAction和RecursiveTask,它们之间的区别是是否返回任务结果,前者用于没有返回结果的任务,后者用于有返回结果的任务。

ForkJoinPool
RecursiveTask
RecursiveAction

 

二、RecursiveTask使用

ForkJoinPoolTest:

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.ForkJoinPool;
 4 import com.cy.java8.AccumulatorRecursiveAction.AccumulatorHelper;
 5 
 6 public class ForkJoinPoolTest {
 7 
 8     private static int[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
 9 
10     public static void main(String[] args) {
11         System.out.println("result = " + calc());
12 
13         AccumulatorRecursiveTask task = new AccumulatorRecursiveTask(0, data.length - 1, data);
14         ForkJoinPool forkJoinPool = new ForkJoinPool();
15         Integer result = forkJoinPool.invoke(task);     //同步调用
16         System.out.println("AccumulatorRecursiveTask result = " + result);
17 
18         AccumulatorRecursiveAction action = new AccumulatorRecursiveAction(0, data.length - 1, data);
19         forkJoinPool.invoke(action);
20         System.out.println("AccumulatorRecursiveAction result = " + AccumulatorHelper.getResult());
21         AccumulatorHelper.reset();
22     }
23 
24     /**
25      * 普通方法计算data的和
26      */
27     private static int calc() {
28         int result = 0;
29         for (int i = 0; i < data.length; i++) {
30             result += data[i];
31         }
32         return result;
33     }
34 }

AccumulatorRecursiveTask

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.RecursiveTask;
 4 
 5 /**
 6  * 继承RecursiveTask,用于与结果返回的任务
 7  */
 8 public class AccumulatorRecursiveTask extends RecursiveTask<Integer> {
 9 
10     //累加的起始下标值
11     private final int start;
12     private final int end;
13     private final int[] data;
14 
15     //阈值为3,当小于等于这个值的时候进行计算
16     private final int LIMIT = 3;
17 
18     public AccumulatorRecursiveTask(int start, int end, int[] data) {
19         this.start = start;
20         this.end = end;
21         this.data = data;
22     }
23 
24     @Override
25     protected Integer compute() {
26         if (end - start < LIMIT) {
27             int result = 0;
28             for (int i = start; i <= end; i++) {
29                 result += data[i];
30             }
31             return result;
32         }
33 
34         int mid = (start + end) / 2;
35 
36         //分为两个任务
37         AccumulatorRecursiveTask left = new AccumulatorRecursiveTask(start, mid, data);
38         AccumulatorRecursiveTask right = new AccumulatorRecursiveTask(mid + 1, end, data);
39         /*
40         //执行子任务
41         left.fork();
42         right.fork();
43         //等待子任务完成
44         Integer leftResult = left.join();
45         Integer rightResult = right.join();
46         //合并子任务
47         return rightResult + leftResult;
48         */
49 
50         //提交任务
51         invokeAll(left, right);
52         return left.join() + right.join();
53     }
54 }

 

三、RecursiveAction使用  

 1 package com.cy.java8;
 2 
 3 import java.util.concurrent.RecursiveAction;
 4 import java.util.concurrent.atomic.AtomicInteger;
 5 
 6 public class AccumulatorRecursiveAction extends RecursiveAction {
 7     private int start;
 8     private int end;
 9     private int[] data;
10     private final int LIMIT = 3;
11 
12     public AccumulatorRecursiveAction(int start, int end, int[] data) {
13         this.start = start;
14         this.end = end;
15         this.data = data;
16     }
17 
18     @Override
19     protected void compute() {
20         if (end - start < LIMIT) {
21             for (int i = start; i <= end; i++) {
22                 AccumulatorHelper.accumulate(data[i]);
23             }
24         } else {
25             int mid = (start + end) / 2;
26             AccumulatorRecursiveAction left = new AccumulatorRecursiveAction(start, mid, data);
27             AccumulatorRecursiveAction right = new AccumulatorRecursiveAction(mid + 1, end, data);
28             /*left.fork();
29             right.fork();
30             left.join();
31             right.join();*/
32             invokeAll(left, right);
33         }
34     }
35 
36     static class AccumulatorHelper{
37         private static final AtomicInteger result = new AtomicInteger(0);
38 
39         private static void accumulate(int value){
40             result.getAndAdd(value);
41         }
42 
43         public static int getResult(){
44             return result.get();
45         }
46 
47         public static void reset(){
48             result.set(0);
49         }
50     }
51 }

ForkJoinPoolTest的执行结果:

result = 55
AccumulatorRecursiveTask result = 55
AccumulatorRecursiveAction result = 55

 

 注:

1、RecursiveAction 提供的方法 invokeAll()。它表示:启动所有的任务,并在所有任务都正常结束后返回。如果其中一个任务出现异常,则其它所有的任务都取消。invokeAll() 的参数还可以是任务的数组。

 

 

 

 

 

 

 

-----------

 

以上是关于ForkJoin使用的主要内容,如果未能解决你的问题,请参考以下文章

ForkJoin的使用及for循环stream并行流三种方式的时间比较

使用 forkJoin() 组合这两个 ws 请求

Observable.forkJoin() 不执行

Observable forkJoin 从不执行管道

JUC中的ForkJoin为什么这么快?基于jdk13的代码带你分析

如何使用NodeJs中的rxjs中的forkjoin进行多次调用,使用NPM请求