ForkJoin使用
Posted tenwood
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ForkJoin使用相关的知识,希望对你有一定的参考价值。
一、Fork Join
分而治之的办法
JDk为Fork/Join框架提供了很好的支持,我们想要用这个算法首先得创建一个Fork/Join任务,在JDK中这个任务就叫做:ForJoinTask,只要继承这个类就可以创建一个任务类,但是实际使用中并不是直接继承ForkJoinTask类,而是继承它的子类,它有两个子类,分别是RecursiveAction和RecursiveTask,它们之间的区别是是否返回任务结果,前者用于没有返回结果的任务,后者用于有返回结果的任务。
ForkJoinPool
RecursiveTask
RecursiveAction
二、RecursiveTask使用
ForkJoinPoolTest:
1 package com.cy.java8; 2 3 import java.util.concurrent.ForkJoinPool; 4 import com.cy.java8.AccumulatorRecursiveAction.AccumulatorHelper; 5 6 public class ForkJoinPoolTest { 7 8 private static int[] data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; 9 10 public static void main(String[] args) { 11 System.out.println("result = " + calc()); 12 13 AccumulatorRecursiveTask task = new AccumulatorRecursiveTask(0, data.length - 1, data); 14 ForkJoinPool forkJoinPool = new ForkJoinPool(); 15 Integer result = forkJoinPool.invoke(task); //同步调用 16 System.out.println("AccumulatorRecursiveTask result = " + result); 17 18 AccumulatorRecursiveAction action = new AccumulatorRecursiveAction(0, data.length - 1, data); 19 forkJoinPool.invoke(action); 20 System.out.println("AccumulatorRecursiveAction result = " + AccumulatorHelper.getResult()); 21 AccumulatorHelper.reset(); 22 } 23 24 /** 25 * 普通方法计算data的和 26 */ 27 private static int calc() { 28 int result = 0; 29 for (int i = 0; i < data.length; i++) { 30 result += data[i]; 31 } 32 return result; 33 } 34 }
AccumulatorRecursiveTask
1 package com.cy.java8; 2 3 import java.util.concurrent.RecursiveTask; 4 5 /** 6 * 继承RecursiveTask,用于与结果返回的任务 7 */ 8 public class AccumulatorRecursiveTask extends RecursiveTask<Integer> { 9 10 //累加的起始下标值 11 private final int start; 12 private final int end; 13 private final int[] data; 14 15 //阈值为3,当小于等于这个值的时候进行计算 16 private final int LIMIT = 3; 17 18 public AccumulatorRecursiveTask(int start, int end, int[] data) { 19 this.start = start; 20 this.end = end; 21 this.data = data; 22 } 23 24 @Override 25 protected Integer compute() { 26 if (end - start < LIMIT) { 27 int result = 0; 28 for (int i = start; i <= end; i++) { 29 result += data[i]; 30 } 31 return result; 32 } 33 34 int mid = (start + end) / 2; 35 36 //分为两个任务 37 AccumulatorRecursiveTask left = new AccumulatorRecursiveTask(start, mid, data); 38 AccumulatorRecursiveTask right = new AccumulatorRecursiveTask(mid + 1, end, data); 39 /* 40 //执行子任务 41 left.fork(); 42 right.fork(); 43 //等待子任务完成 44 Integer leftResult = left.join(); 45 Integer rightResult = right.join(); 46 //合并子任务 47 return rightResult + leftResult; 48 */ 49 50 //提交任务 51 invokeAll(left, right); 52 return left.join() + right.join(); 53 } 54 }
三、RecursiveAction使用
1 package com.cy.java8; 2 3 import java.util.concurrent.RecursiveAction; 4 import java.util.concurrent.atomic.AtomicInteger; 5 6 public class AccumulatorRecursiveAction extends RecursiveAction { 7 private int start; 8 private int end; 9 private int[] data; 10 private final int LIMIT = 3; 11 12 public AccumulatorRecursiveAction(int start, int end, int[] data) { 13 this.start = start; 14 this.end = end; 15 this.data = data; 16 } 17 18 @Override 19 protected void compute() { 20 if (end - start < LIMIT) { 21 for (int i = start; i <= end; i++) { 22 AccumulatorHelper.accumulate(data[i]); 23 } 24 } else { 25 int mid = (start + end) / 2; 26 AccumulatorRecursiveAction left = new AccumulatorRecursiveAction(start, mid, data); 27 AccumulatorRecursiveAction right = new AccumulatorRecursiveAction(mid + 1, end, data); 28 /*left.fork(); 29 right.fork(); 30 left.join(); 31 right.join();*/ 32 invokeAll(left, right); 33 } 34 } 35 36 static class AccumulatorHelper{ 37 private static final AtomicInteger result = new AtomicInteger(0); 38 39 private static void accumulate(int value){ 40 result.getAndAdd(value); 41 } 42 43 public static int getResult(){ 44 return result.get(); 45 } 46 47 public static void reset(){ 48 result.set(0); 49 } 50 } 51 }
ForkJoinPoolTest的执行结果:
result = 55 AccumulatorRecursiveTask result = 55 AccumulatorRecursiveAction result = 55
注:
1、RecursiveAction 提供的方法 invokeAll()。它表示:启动所有的任务,并在所有任务都正常结束后返回。如果其中一个任务出现异常,则其它所有的任务都取消。invokeAll() 的参数还可以是任务的数组。
-----------
以上是关于ForkJoin使用的主要内容,如果未能解决你的问题,请参考以下文章
ForkJoin的使用及for循环stream并行流三种方式的时间比较