Java线程:Fork/Join-Java并行计算框架

Posted 高爽 Coder

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java线程:Fork/Join-Java并行计算框架相关的知识,希望对你有一定的参考价值。

并行计算在处处都有大数据的今天已经不是一个新鲜的词汇了,现在已经有单机多核甚至多机集群并行计算,注意,这里说的是并行,而不是并发。严格的将,并行是指系统内有多个任务同时执行,而并发是指系统内有多个任务同时存在,不同的任务按时间分片的方式切换执行,由于切换的时间很短,给人的感觉好像是在同时执行。
Java在JDK7之后加入了并行计算的框架Fork/Join,可以解决我们系统中大数据计算的性能问题。Fork/Join采用的是分治法,Fork是将一个大任务拆分成若干个子任务,子任务分别去计算,而Join是获取到子任务的计算结果,然后合并,这个是递归的过程。子任务被分配到不同的核上执行时,效率最高。伪代码如下:

Result solve(Problem problem) 
    if (problem is small)
        directly solve problem
    else 
        split problem into independent parts
        fork new subtasks to solve each part
        join all subtasks
        compose result from subresults
    

Fork/Join框架的核心类是ForkJoinPool,它能够接收一个ForkJoinTask,并得到计算结果。ForkJoinTask有两个子类,RecursiveTask(有返回值)和RecursiveAction(无返回结果),我们自己定义任务时,只需选择这两个类继承即可。类图如下:

下面来看一个实例:计算一个超大数组所有元素的和。代码如下:

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/**
 * @author: shuang.gao  Date: 2015/7/14 Time: 8:16
 */
public class SumTask extends RecursiveTask<Integer> 

    private static final long serialVersionUID = -6196480027075657316L;

    private static final int THRESHOLD = 500000;

    private long[] array;

    private int low;

    private int high;

    public SumTask(long[] array, int low, int high) 
        this.array = array;
        this.low = low;
        this.high = high;
    

    @Override
    protected Integer compute() 
        int sum = 0;
        if (high - low <= THRESHOLD) 
            // 小于阈值则直接计算
            for (int i = low; i < high; i++) 
                sum += array[i];
            
         else 
            // 1. 一个大任务分割成两个子任务
            int mid = (low + high) >>> 1;
            SumTask left = new SumTask(array, low, mid);
            SumTask right = new SumTask(array, mid + 1, high);

            // 2. 分别计算
            left.fork();
            right.fork();

            // 3. 合并结果
            sum = left.join() + right.join();
        
        return sum;
    

    public static void main(String[] args) throws ExecutionException, InterruptedException 
        long[] array = genArray(1000000);

        System.out.println(Arrays.toString(array));

        // 1. 创建任务
        SumTask sumTask = new SumTask(array, 0, array.length - 1);

        long begin = System.currentTimeMillis();

        // 2. 创建线程池
        ForkJoinPool forkJoinPool = new ForkJoinPool();

        // 3. 提交任务到线程池
        forkJoinPool.submit(sumTask);

        // 4. 获取结果
        Integer result = sumTask.get();

        long end = System.currentTimeMillis();

        System.out.println(String.format("结果 %s 耗时 %sms", result, end - begin));
    

    private static long[] genArray(int size) 
        long[] array = new long[size];
        for (int i = 0; i < size; i++) 
            array[i] = new Random().nextLong();
        
        return array;
    

我们通过调整阈值(THRESHOLD),可以发现耗时是不一样的。实际应用中,如果需要分割的任务大小是固定的,可以经过测试,得到最佳阈值;如果大小不是固定的,就需要设计一个可伸缩的算法,来动态计算出阈值。如果子任务很多,效率并不一定会高。
未完待续。。。


参考资料

http://gee.cs.oswego.edu/dl/papers/fj.pdf
https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
https://www.ibm.com/developerworks/cn/java/j-lo-forkjoin/
http://www.ibm.com/developerworks/cn/java/j-jtp11137.html

本文来自:高爽|Coder,原文地址:http://blog.csdn.net/ghsau/article/details/46287769,转载请注明。

以上是关于Java线程:Fork/Join-Java并行计算框架的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程之利用线程池并行计算例子

Java使用线程池和多线程实现并行计算(案例一)

Java 多线程进阶-并发编程 线程组ThreadGroup

你应该这样去开发接口:Java多线程并行计算

java~并行计算~Future和Callable实现大任务的并行处理

多线程 fork/join 并行计算