Java多线程Java的MapReduce框架ForkJoin

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java多线程Java的MapReduce框架ForkJoin相关的知识,希望对你有一定的参考价值。

    Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

    Fork就是把一个大任务切分为若干子任务并行的执行。类似MapReduce里面的Map。

    Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。类似MapReduce里面的Reduce。

    

    举例说明,统计1~100的和,并行运行, 每个线程计算20个数的,如果当前线程统计的数量多于20,就切分为两个线程运行,切分点为中间数,至少分配的每一个线程的统计数小于或等于20,这个分裂任务的过程就叫做Fork。最后各个线程向上汇报汇总结果,这个汇聚结果的过程就叫做Join。


    流程图如下:

技术分享



    代码如下:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;

class Calculator extends RecursiveTask<Integer> 
{  
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    /**
     * 线程计数器
     */
    public static AtomicInteger tcounter = new AtomicInteger(0);
    /**
     * 计算阀值
     */
    private static final int THRESHOLD = 20;  
    /**
     * 开始值
     */
    private int start;
    /**
     * 结束值
     */
    private int end;  
  
    public Calculator(int start, int end) 
    {  
        this.start = start;  
        this.end = end;  
    }  
  
    @Override  
    protected Integer compute() 
    { 
    	tcounter.incrementAndGet();
    	System.out.println("start:" + start + ", end:" + end);
    	
        int sum = 0;  
        if((end - start) <= THRESHOLD)
        {  
        	/**
        	 * 小于等于阀值,直接计算
        	 */
            for(int i = start; i<= end; i++)
            {  
                sum += i;  
            }
        }
        else
        {  
        	/**
        	 * 大于阀值,任务分解, 并汇聚结果
        	 */
            int middle = (start + end) / 2;  
            Calculator left = new Calculator(start, middle);  
            Calculator right = new Calculator(middle + 1, end);  

            left.fork();
            right.fork();
           
            sum = left.join() + right.join();  
        } 
        
        return sum;  
    }  
  
}  

public class TestForkJoinPool 
{
	public static void main(String[] args) throws InterruptedException, ExecutionException 
	{
	    ForkJoinPool forkJoinPool = new ForkJoinPool();  
	    Future<Integer> result = forkJoinPool.submit(new Calculator(1, 100));  
	    System.out.println("结果: " + result.get() + ", " + Calculator.tcounter + "个线程参与了运算");
	}
}

 执行结果

start:1, end:100
start:1, end:50
start:51, end:100
start:1, end:25
start:1, end:13
start:26, end:50
start:14, end:25
start:51, end:75
start:26, end:38
start:39, end:50
start:51, end:63
start:64, end:75
start:76, end:100
start:76, end:88
start:89, end:100
结果: 5050, 15个线程参与了运算

 


    聊聊核心类

    ForkJoinPool:负责建立一个ForkJoin运行环境

    ForkJoinTask: 表示实际运行的任务,一般使用它的两个子类,一个是RecursiveAction(任务不带返回值时使用),另一个是RecursiveTask(任务带返回值时使用)。按照情况选择任意一个, 使用时需要继承该子类,然后实现抽象方法compute【任务的逻辑就在compute方法里编写】。

 ForkJoinTask.fork方法表示分裂任务, ForkJoinTask.join表示汇聚分裂任务的compute方法的执行结果。

    

   


本文出自 “DeaconLi” 博客,请务必保留此出处http://lizhuquan0769.blog.51cto.com/2591147/1789417

以上是关于Java多线程Java的MapReduce框架ForkJoin的主要内容,如果未能解决你的问题,请参考以下文章

JAVA下实现多线程断点下载

java多线程ForkJoin分支合并

用于测试多线程 Java 应用程序的确定性记录/重放框架

Java 通用爬虫框架中多线程的使用

Java多线程学习线程池与Executor 框架

java nio并发访问问题,我现在利用nio框架制服务器的并发访问,SelectionKey多线程