Fork/Join框架

Posted 紫乾2014

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Fork/Join框架相关的知识,希望对你有一定的参考价值。

一、基本介绍

1.1 基本介绍

Fork/Join框架是Java 7提供的一个用于并行执行任务的框架, 核心思想就是把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果,其实现思想与MapReduce有异曲同工之妙。

Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。Fork/Join的运行流程图如下:

Fork/Join框架使用一个巧妙的算法来平衡线程的负载,称为工作窃取(work-stealing)算法。工作窃取的运行流程图如下:

假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

1.2 API代码分析

ForkJoinTask : 基本任务,使用fork、join框架必须创建的对象,提供fork,join操作,常用的三个子类

  • RecursiveAction : 无结果返回的任务
  • RecursiveTask : 有返回结果的任务
  • CountedCompleter:无返回值任务,完成任务后可以触发回调。

ForkJoinTask提供了两个重要的方法:

  • fork : 让task异步执行
  • join : 让task同步执行,可以获取返回值

ForkJoinPool : 专门用来运行 ForkJoinTask 的线程池,(在实际使用中,也可以接收
Runnable/Callable任务,但在真正运行时,也会把这些任务封装成 ForkJoinTask类型的任务)

ForkJoinTask 在不显式使用 ForkJoinPool.execute/invoke/submit() 方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行

二、应用

2.1 简单应用

ForkJoinExample.java

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class ForkJoinExample 

    //java8 parallStream

    //针对一个数字,做计算。
    private static final Integer MAX=200;

    static class CalcForJoinTask extends RecursiveTask<Integer> 
        private Integer startValue; //子任务的开始计算的值
        private Integer endValue; //子任务结束计算的值

        public CalcForJoinTask(Integer startValue, Integer endValue) 
            this.startValue = startValue;
            this.endValue = endValue;
        
        @Override
        protected Integer compute() 
            //如果当前的数据区间已经小于MAX了,那么接下来的计算不需要做拆分
            if(endValue-startValue<MAX)
                System.out.println("开始计算:startValue:"+startValue+" ; endValue:"+endValue);
                Integer totalValue=0;
                for(int i=this.startValue;i<=this.endValue;i++)
                    totalValue+=i;
                
                return totalValue;
            
            CalcForJoinTask subTask=new CalcForJoinTask(startValue,(startValue+endValue)/2);
            subTask.fork();
            CalcForJoinTask calcForJoinTask=new CalcForJoinTask((startValue+endValue)/2+1,endValue);
            calcForJoinTask.fork();
            return subTask.join()+calcForJoinTask.join();
        
    

    public static void main(String[] args) 
        CalcForJoinTask calcForJoinTask=new CalcForJoinTask(1,10000);
        ForkJoinPool pool=new ForkJoinPool();
        ForkJoinTask<Integer> taskFuture=pool.submit(calcForJoinTask);
        try 
            Integer result=taskFuture.get();
            System.out.println("result:"+result);
         catch (InterruptedException e) 
            e.printStackTrace();
         catch (ExecutionException e) 
            e.printStackTrace();
        
    


2.2 业务应用

业务背景:对商品信息、商品评论信息、商品销量数据、商家信息进行聚合查询。

2.2.1 项目搭建

创建maven项目
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>spring-boot-fork-join</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-fork-join</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2.2.2 创建实体类

Item.java

/**
 * 商品信息
 **/
public class Item 

    private String productName;
    private int num;

    public String getProductName() 
        return productName;
    

    public void setProductName(String productName) 
        this.productName = productName;
    

    public int getNum() 
        return num;
    

    public void setNum(int num) 
        this.num = num;
    

Comment.java

/**
 商品评论信息
 **/
public class Comment 
    private String name;
    private String content;

    public String getName() 
        return name;
    

    public void setName(String name) 
        this.name = name;
    

    public String getContent() 
        return content;
    

    public void setContent(String content) 
        this.content = content;
    

Seller.java

/**
 * 销售信息
 **/

public class Seller 
    private int totalNum;
    private int sellerNum;

    public int getTotalNum() 
        return totalNum;
    

    public void setTotalNum(int totalNum) 
        this.totalNum = totalNum;
    

    public int getSellerNum() 
        return sellerNum;
    

    public void setSellerNum(int sellerNum) 
        this.sellerNum = sellerNum;
    

Shop.java

/**
 * 商家信息
 **/
public class Shop 

    private String name;

    public String getName() 
        return name;
    

    public void setName(String name) 
        this.name = name;
    

Context.java

/**
 * 聚合信息
 **/
public class Context 

    private Item item;  //商品
    private Comment comment; //评论
    private Seller seller; //销售信息
    private Shop shop;  //店铺信息

    public Item getItem() 
        return item;
    

    public void setItem(Item item) 
        this.item = item;
    

    public Comment getComment() 
        return comment;
    

    public void setComment(Comment comment) 
        this.comment = comment;
    

    public Seller getSeller() 
        return seller;
    

    public void setSeller(Seller seller) 
        this.seller = seller;
    

    public Shop getShop() 
        return shop;
    

    public void setShop(Shop shop) 
        this.shop = shop;
    

    @Override
    public String toString() 
        return "Context" +
                "item=" + item +
                ", comment=" + comment +
                ", seller=" + seller +
                ", shop=" + shop +
                '';
    

2.2.3 创建基础接口和类

ILoadDataProcessor.java

public interface ILoadDataProcessor 

    /**
     * 加载对应的数据
     * @param context
     */
    void load(Context context);

AbstractLoadDataProcessor.java

import java.util.concurrent.RecursiveAction;

/**
数据加载抽象类
 **/
public abstract class AbstractLoadDataProcessor extends RecursiveAction implements ILoadDataProcessor

    protected Context context;

    @Override
    protected void compute() 
        load(context); //调用子类的具体实现
    

    public Context getContext() 
        this.join(); //得到一个聚合的结果
        return context;
    

    public void setContext(Context context) 
        this.context = context;
    

2.2.4 创建服务类

CommentService.java

import org.springframework.stereotype.Service;

/**
 评论服务类
 **/
@Service
public class CommentService extends AbstractLoadDataProcessor
    @Override
    public void load(Context context) 
        //RPC.
        Comment comment=new Comment();
        comment.setName("XIWANG");
        comment.setContent("商品质量很好");
        context.setComment(comment);
    

ItemService.java

import org.springframework.stereotype.Service;

/**
商品服务类
 **/
@Service
public class ItemService extends AbstractLoadDataProcessor
    @Override
    public void load(Context context) 
        Item item=new Item();
        item.setNum(100);
        item.setProductName("键盘");
        context.setItem(item);
    

SellerService.java

import org.springframework.stereotype.Service;

/**
销量服务类
 **/
@Service
public class SellerService extends AbstractLoadDataProcessor

    @Override
   public void load(Context context) 
        Seller seller=new Seller();
        seller.setSellerNum(100);
        seller.setTotalNum(1000);
        context.setSeller(seller);
    

ShopService.java

import org.springframework.stereotype.Service;

/**
商家服务类
 **/
@Service
public class ShopService extends AbstractLoadDataProcessor
    @Override
    public void load(Context context) 
        Shop shop=new Shop();
        shop.setName("令狐冲小店");
        context.setShop(shop);
    

2.2.5 创建聚合任务类

ComplexTradeTaskService.java

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinTask;

/**
销量和商家聚合任务类
 **/
@Service
public class ComplexTradeTaskService extends AbstractLoadDataProcessor implements ApplicationContextAware 

    ApplicationContext applicationContext;

    private List<AbstractLoadDataProcessor> taskDataProcessors=new ArrayList<>();

    @Override
    public void load(Context context) 
        taskDataProcessors.forEach(abstractLoadDataProcessor->
            abstractLoadDataProcessor.setContext(this.context);
            abstractLoadDataProcessor.fork();//创建一个fork task
        );
    

    @Override
    public Context getContext() 
        this.taskDataProcessors.forEach(ForkJoinTask::join);
        return super.getContext();
    

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 
        this.applicationContext=applicationContext;
        taskDataProcessors.add(applicationContext.getBean(SellerService.class));
        taskDataProcessors.add(applicationContext.getBean(ShopService.class));
    

ItemTaskForkJoinDataProcessor.java

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinTask;

/**
 * 全局聚合任务
 **/
@Service
public class ItemTaskForkJoinDataProcessor extends AbstractLoadDataProcessor implements ApplicationContextAware 

    ApplicationContext applicationContext;

    private List<AbstractLoadDataProcessor> taskDataProcessors=new ArrayList<>();

    @Override
    public void load(Context context) 
        taskDataProcessors.forEach(abstractLoadDataProcessor->
            abstractLoadDataProcessor.setContext(this.context);
            abstractLoadDataProcessor.fork();//创建一个fork task
        );
    
    @Override
    public Context getContext() 
        //ForkJoinTask::join  java8方法引用
        // * 构造引用
        // * 静态方法引用
        // * 实例方法引用
        this.taskDataProcessors.forEach(ForkJoinTask::join);
        return super.getContext();
    

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException 
        this.applicationContext=applicationContext;
        taskDataProcessors.add(applicationContext.getBean(CommentService.class));
        taskDataProcessors.add(applicationContext.getBean(ItemService.class));
        taskDataProcessors.add(applicationContext.getBean(ComplexTradeTaskService.class));
    

2.2.6 创建控制器类和启动类

IndexController.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ForkJoinPool;

@RestController
public class IndexController 

    @Autowired
    ItemTaskForkJoinDataProcessor itemTaskForkJoinDataProcessor;

    @GetMapping("/say")
    public Context index()
        Context context=new Context();
        itemTaskForkJoinDataProcessor.setContext(context);
        ForkJoinPool forkJoinPool=new ForkJoinPool();
        forkJoinPool.submit(itemTaskForkJoinDataProcessor);
        return itemTaskForkJoinDataProcessor.getContext();
    

SpringBootForkJoinApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootForkJoinApplication 

    public static void main(String[] args) 
        SpringApplication.run(SpringBootForkJoinApplication.class, args);
    


2.2.7 测试

启动项目后浏览器访问http://localhost:8080/say
输出结果:

以上是关于Fork/Join框架的主要内容,如果未能解决你的问题,请参考以下文章

Fork/Join框架

12_分支合并框架 Fork/Join

12_分支合并框架 Fork/Join

Fork/Join-Java并行计算框架

Java并发多线程编程——Fork/Join框架

Java——多线程高并发系列之Fork/Join框架简单应用