Java并发之Fork-Join

Posted 三名狂客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发之Fork-Join相关的知识,希望对你有一定的参考价值。

一、Fork-Join 简介

fork/join 框架是 Java 7 中引入的 ,它是一个工具,通过 「 分而治之 」 的方法尝试将所有可用的处理器内核使用起来帮助加速并行处理。在实际使用过程中,这种 「 分而治之 」的方法意味着框架首先要 fork ,递归地将任务分解为较小的独立子任务,直到它们足够简单以便异步执行。然后,join 部分开始工作,将所有子任务的结果递归地连接成单个结果,或者在返回 void 的任务的情况下,程序只是等待每个子任务执行完毕。

二、ForkJoinPool 线程池

ForkJoinPool 是 fork/join 框架的核心,是 ExecutorService的一个实现,用于管理工作线程,并提供了一些工具来帮助获取有关线程池状态和性能的信息(工作窃取( work-stealing )算法)。

三、ForkJoinPool线程池的实例化

(1) 在Java8 中提供 commonPool() 静态方法

 ForkJoinPool commonPool = ForkJoinPool.commonPool();

(2) 在Java 7中 提供创建ForkJoinPool 的实例化方法

public static ForkJoinPool forkJoinPool = new ForkJoinPool(2);

四、ForkJoinTask 类

ForkJoinTaskForkJoinPool 线程之中执行的任务的基本类型。我们日常使用时,一般不直接使用 ForkJoinTask ,而是扩展它的两个子类中的任意一个:

1、任务不返回结果 ( 返回 void ) 的 RecursiveAction
2、返回值的任务的 RecursiveTask

这两个类都有一个抽象方法 compute() ,用于定义任务的逻辑。

(1) RecursiveAction

public class CustomRecursiveAction extends RecursiveAction 

    private String workload = "";
    private static final int THRESHOLD = 4;

    private static Logger logger = 
      Logger.getAnonymousLogger();

    public CustomRecursiveAction(String workload) 
        this.workload = workload;
    

    @Override
    protected void compute() 
        if (workload.length() > THRESHOLD) 
            ForkJoinTask.invokeAll(createSubtasks());
         else 
           processing(workload);
        
    

    private List<CustomRecursiveAction> createSubtasks() 
        List<CustomRecursiveAction> subtasks = new ArrayList<>();

        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());

        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));

        return subtasks;
    

    private void processing(String work) 
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by "
          + Thread.currentThread().getName());
    

(2) RecursiveTask 类

对于有返回值的任务,除了将每个子任务的结果在一个结果中合并,其它逻辑和 RecursiveAction 都差不多。

public class CustomRecursiveTask extends RecursiveTask<Integer> 
    private int[] arr;

    private static final int THRESHOLD = 20;

    public CustomRecursiveTask(int[] arr) 
        this.arr = arr;
    

    @Override
    protected Integer compute() 
        if (arr.length > THRESHOLD) 
            return ForkJoinTask.invokeAll(createSubtasks())
              .stream()
              .mapToInt(ForkJoinTask::join)
              .sum();
         else 
            return processing(arr);
        
    

    private Collection<CustomRecursiveTask> createSubtasks() 
        List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    

    private Integer processing(int[] arr) 
        return Arrays.stream(arr)
          .filter(a -> a > 10 && a < 27)
          .map(a -> a * 10)
          .sum();
    

五、 将任务提交到 ForkJoinPool 线程池中

(1) submit() 方法

forkJoinPool.submit(customRecursiveTask);
int result = customRecursiveTask.join();

(2) execute 方法

forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

(3) 使用 invoke() 或 invokeAll( ) 方法 fork 任务并等待结果,不需要任何手动连接 ( join )

int result = forkJoinPool.invoke(customRecursiveTask);

当涉及到多个任务且要保证任务的顺序时,通常都是使用 ForkJoinPool.invokeAll()

(4)最终执行程序

package day01;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.logging.Logger;


/**
 *  fork-Join方法实现
 *  @author zc
 *
 */
public class ForkJoinTest  extends RecursiveAction 
	
	
	private static final long serialVersionUID = 1L;
	
	private String workload = "";
	
	private static final int THRESHOLD = 4;

	  private static Logger logger = 
		      Logger.getAnonymousLogger();

     public ForkJoinTest(String workload) 
		        this.workload = workload;
	
     
	@Override
	protected void compute() 
	
		 if (workload.length() > THRESHOLD) 
	            ForkJoinTask.invokeAll(createSubtasks());
	       else 
	           processing(workload);
	      
		
	
	 private List<ForkJoinTest> createSubtasks() 
	        List<ForkJoinTest> subtasks = new ArrayList<>();

	        String partOne = workload.substring(0, workload.length() / 2);
	        String partTwo = workload.substring(workload.length() / 2, workload.length());

	        subtasks.add(new ForkJoinTest(partOne));
	        subtasks.add(new ForkJoinTest(partTwo));

	        return subtasks;
	    

	    private void processing(String work) 
	        String result = work.toUpperCase();
	        logger.info("This result - (" + result + ") - was processed by "
	          + Thread.currentThread().getName());
	    
	
	public static void main(String[] args) 
		
		// (1) ForkJoinPool实例化方式一
		ForkJoinPool commonPool =ForkJoinPool.commonPool();
		
		// (2) ForkJoinPool实例化方式二, 2代表两个处理单元
		 ForkJoinPool forkJoinPool =new  ForkJoinPool(2);
		 
		 
		 ForkJoinTest forkJoin = new  ForkJoinTest("tewrerefedewrewr");

		 
		 commonPool.submit(forkJoin);
		 
		 //执行计算结果
		 forkJoin.join();
		 
		 
	

	



以上是关于Java并发之Fork-Join的主要内容,如果未能解决你的问题,请参考以下文章

JDK:Fork-Join框架

Java 多线程进阶-并发协作控制

Java并发编程-线程的并发工具类

最通俗的例子讲解Java中的fork-join

Java7 Fork-Join 框架:任务切分,并行处理

Fork-Join分治编程介绍