JUC并发编程 共享模式之工具 线程池 -- Fork / Join 框架(JDK1.7 新加入的线程池实现)

Posted Z && Y

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JUC并发编程 共享模式之工具 线程池 -- Fork / Join 框架(JDK1.7 新加入的线程池实现)相关的知识,希望对你有一定的参考价值。

1. Fork / Join 框架


1.1 介绍

  • Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型运算
  • 所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计算,如归并排序、斐波那契数列、都可以用分治思想进行求解
  • Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运算效率
  • Fork/Join 默认会创建与 cpu 核心数大小相同的线程池

1.2 基本使用

示例代码:

package com.tian;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

@Slf4j(topic = "c.TestForkJoin2")
public class TestForkJoin2 {

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool(4);
        // new MyTask(5)  5+ new MyTask(4)  4 + new MyTask(3)  3 + new MyTask(2)  2 + new MyTask(1)
        System.out.println("计算的结果是: " + pool.invoke(new MyTask(5)));
    }
}


/**
 * RecursiveTask: 有返回值
 * RecursiveAction: 无返回值
 */
@Slf4j(topic = "c.MyTask")
class MyTask extends RecursiveTask<Integer> {

    private final int n;

    public MyTask(int n) {
        this.n = n;
    }

    @Override
    public String toString() {
        return "{" + n + '}';
    }

    // 1~n 之间整数的和
    @Override
    protected Integer compute() {
        // 如果 n 已经为 1,可以求得结果了
        if (n == 1) {
            log.debug("join() {}", n);
            return n;
        }

        // 将任务进行拆分(fork)
        AddTask1 t1 = new AddTask1(n - 1);
        t1.fork();
        log.debug("fork() {} + {}", n, t1);

        // 合并(join)结果
        int result = n + t1.join();
        log.debug("join() {} + {} = {}", n, t1, result);
        return result;
    }
}

运行结果:

运行流程图示:


1.3 优化任务拆分(难点)

package com.tian;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class TestForkJoin {

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool(4);
        System.out.println(pool.invoke(new AddTask(1, 5)));
    }
}

@Slf4j(topic = "c.AddTask")
class AddTask extends RecursiveTask<Integer> {

    // 起始数字
    int begin;
    // 结束数字
    int end;

    public AddTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    public String toString() {
        return "{" + begin + "," + end + '}';
    }

    @Override
    protected Integer compute() {
        // 5,5
        if (begin == end) {
            log.debug("join() {}", begin);
            return begin;
        }
        // 4,5
        if (end - begin == 1) {
            log.debug("join() {} + {} = {}", begin, end, end + begin);
            return end + begin;
        }
        // 1,5 => mid = 3
        int mid = (end + begin) / 2;

        // 计算 1, 3
        AddTask t1 = new AddTask(begin, mid);
        t1.fork();
        // 计算 4, 5
        AddTask t2 = new AddTask(mid + 1, end);
        t2.fork();
        log.debug("fork() {} + {} = ?", t1, t2);

        // 结果求和
        int result = t1.join() + t2.join();
        log.debug("join() {} + {} = {}", t1, t2, result);
        return result;
    }
}

运行结果:

运行流程图示:



以上是关于JUC并发编程 共享模式之工具 线程池 -- Fork / Join 框架(JDK1.7 新加入的线程池实现)的主要内容,如果未能解决你的问题,请参考以下文章

JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- Executors 类创建线程池(创建ThreadPoolExecutor对象)

JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(关闭线程池: shutdownshutdownNow)

JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(线程池状态构造方法)

JUC并发编程 共享模式之工具 ThreadPoolExecutor -- 线程池应用之定时任务(在每周周四执行定时任务)

JUC并发编程 共享模式之工具 线程池 JDK 提供的线程池工具类 -- ThreadPoolExecutor(提交任务: submitexecute invokAllinvokeAny)

JUC并发编程 共享模式之工具 ThreadPoolExecutor -- 任务调度线程池 定时任务 / 延时执行(ScheduledThreadPoolExecutor 延时执行 / 定时执行)(代