ForkJoin 任务分解
Posted WindWant
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ForkJoin 任务分解相关的知识,希望对你有一定的参考价值。
Fork/Join框架的介绍
第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。
第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join使用两个类来完成以上两件事情:
- ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask :用于有返回结果的任务。
- ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
1 package com.thread.test.thread; 2 3 import java.io.File; 4 import java.util.ArrayList; 5 import java.util.List; 6 import java.util.concurrent.ForkJoinPool; 7 import java.util.concurrent.RecursiveTask; 8 9 /** 10 * Created by windwant on 2016/6/3. 11 */ 12 public class MyForkJoin { 13 14 public static void main(String[] args) { 15 MyTask task = new MyTask(new File("D:\\MPS")); 16 Integer sum = new ForkJoinPool().invoke(task); 17 System.out.println(sum); 18 } 19 } 20 21 class MyTask extends RecursiveTask<Integer>{ 22 23 public Integer num = 0; 24 25 private File file; 26 MyTask(File file){ 27 this.file = file; 28 } 29 30 @Override 31 protected Integer compute() { 32 List<MyTask> taskList = new ArrayList<MyTask>(); 33 if(file.isDirectory()){ 34 File[] list = file.listFiles(); 35 for(File subf: list){ 36 if(subf.isDirectory()){ 37 MyTask mt = new MyTask(subf); 38 taskList.add(mt); 39 }else{ 40 num++; 41 } 42 } 43 }else{ 44 num = 1; 45 } 46 47 if(!taskList.isEmpty()){ 48 //同下 49 // for(MyTask mtask: taskList){ 50 // mtask.fork(); 51 // } 52 // for(MyTask mtask: taskList){ 53 // num += mtask.join(); 54 // } 55 56 for(MyTask mtask: invokeAll(taskList)){ 57 num += mtask.join(); 58 } 59 } 60 return num; 61 } 62 }
项目地址:https://github.com/windwant/threadtest
以上是关于ForkJoin 任务分解的主要内容,如果未能解决你的问题,请参考以下文章
ForkJoin的使用及for循环stream并行流三种方式的时间比较