Java并发之Fork-Join
Posted 三名狂客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发之Fork-Join相关的知识,希望对你有一定的参考价值。
一、Fork-Join 简介
fork/join 框架是 Java 7 中引入的 ,它是一个工具,通过 「 分而治之 」 的方法尝试将所有可用的处理器内核使用起来帮助加速并行处理。在实际使用过程中,这种 「 分而治之 」的方法意味着框架首先要 fork
,递归地将任务分解为较小的独立子任务,直到它们足够简单以便异步执行。然后,join
部分开始工作,将所有子任务的结果递归地连接成单个结果,或者在返回 void 的任务的情况下,程序只是等待每个子任务执行完毕。
二、ForkJoinPool 线程池
ForkJoinPool
是 fork/join 框架的核心,是 ExecutorService
的一个实现,用于管理工作线程,并提供了一些工具来帮助获取有关线程池状态和性能的信息(工作窃取( work-stealing )算法)。
三、ForkJoinPool线程池的实例化
(1) 在Java8 中提供 commonPool() 静态方法
ForkJoinPool commonPool = ForkJoinPool.commonPool();
(2) 在Java 7中 提供创建ForkJoinPool 的实例化方法
public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);
四、ForkJoinTask 类
ForkJoinTask
是 ForkJoinPool
线程之中执行的任务的基本类型。我们日常使用时,一般不直接使用 ForkJoinTask
,而是扩展它的两个子类中的任意一个:
1、任务不返回结果 ( 返回 void
) 的 RecursiveAction
2、返回值的任务的 RecursiveTask
这两个类都有一个抽象方法 compute()
,用于定义任务的逻辑。
(1) RecursiveAction
类
public class CustomRecursiveAction extends RecursiveAction
private String workload = "";
private static final int THRESHOLD = 4;
private static Logger logger =
Logger.getAnonymousLogger();
public CustomRecursiveAction(String workload)
this.workload = workload;
@Override
protected void compute()
if (workload.length() > THRESHOLD)
ForkJoinTask.invokeAll(createSubtasks());
else
processing(workload);
private List<CustomRecursiveAction> createSubtasks()
List<CustomRecursiveAction> subtasks = new ArrayList<>();
String partOne = workload.substring(0, workload.length() / 2);
String partTwo = workload.substring(workload.length() / 2, workload.length());
subtasks.add(new CustomRecursiveAction(partOne));
subtasks.add(new CustomRecursiveAction(partTwo));
return subtasks;
private void processing(String work)
String result = work.toUpperCase();
logger.info("This result - (" + result + ") - was processed by "
+ Thread.currentThread().getName());
(2) RecursiveTask 类
对于有返回值的任务,除了将每个子任务的结果在一个结果中合并,其它逻辑和 RecursiveAction
都差不多。
public class CustomRecursiveTask extends RecursiveTask<Integer>
private int[] arr;
private static final int THRESHOLD = 20;
public CustomRecursiveTask(int[] arr)
this.arr = arr;
@Override
protected Integer compute()
if (arr.length > THRESHOLD)
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.mapToInt(ForkJoinTask::join)
.sum();
else
return processing(arr);
private Collection<CustomRecursiveTask> createSubtasks()
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, 0, arr.length / 2)));
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
return dividedTasks;
private Integer processing(int[] arr)
return Arrays.stream(arr)
.filter(a -> a > 10 && a < 27)
.map(a -> a * 10)
.sum();
五、 将任务提交到 ForkJoinPool 线程池中
(1) submit() 方法
forkJoinPool.submit(customRecursiveTask);
int result = customRecursiveTask.join();
(2) execute 方法
forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();
(3) 使用 invoke()
或 invokeAll( ) 方法 fork
任务并等待结果,不需要任何手动连接 ( join )
int result = forkJoinPool.invoke(customRecursiveTask);
当涉及到多个任务且要保证任务的顺序时,通常都是使用 ForkJoinPool.invokeAll()
(4)最终执行程序
package day01;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.logging.Logger;
/**
* fork-Join方法实现
* @author zc
*
*/
public class ForkJoinTest extends RecursiveAction
private static final long serialVersionUID = 1L;
private String workload = "";
private static final int THRESHOLD = 4;
private static Logger logger =
Logger.getAnonymousLogger();
public ForkJoinTest(String workload)
this.workload = workload;
@Override
protected void compute()
if (workload.length() > THRESHOLD)
ForkJoinTask.invokeAll(createSubtasks());
else
processing(workload);
private List<ForkJoinTest> createSubtasks()
List<ForkJoinTest> subtasks = new ArrayList<>();
String partOne = workload.substring(0, workload.length() / 2);
String partTwo = workload.substring(workload.length() / 2, workload.length());
subtasks.add(new ForkJoinTest(partOne));
subtasks.add(new ForkJoinTest(partTwo));
return subtasks;
private void processing(String work)
String result = work.toUpperCase();
logger.info("This result - (" + result + ") - was processed by "
+ Thread.currentThread().getName());
public static void main(String[] args)
// (1) ForkJoinPool实例化方式一
ForkJoinPool commonPool =ForkJoinPool.commonPool();
// (2) ForkJoinPool实例化方式二, 2代表两个处理单元
ForkJoinPool forkJoinPool =new ForkJoinPool(2);
ForkJoinTest forkJoin = new ForkJoinTest("tewrerefedewrewr");
commonPool.submit(forkJoin);
//执行计算结果
forkJoin.join();
以上是关于Java并发之Fork-Join的主要内容,如果未能解决你的问题,请参考以下文章