《Java8实战》 - 读书笔记 - Parallel Stream并行流知识

Posted 笑虾

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了《Java8实战》 - 读书笔记 - Parallel Stream并行流知识相关的知识,希望对你有一定的参考价值。

《Java8实战》 - 读书笔记 - Parallel Stream并行流知识

第7章 并行数据处理与性能

7.1 并行流

7.1.1 将顺序流转换为并行流

  • parallel
  • sequential
stream.parallel() 
 .filter(...) 
 .sequential() 
 .map(...) 
 .parallel() 
 .reduce();

最后一次parallelsequential调用会影响整个流水线。在本例中,流水线会并行执行,因为最后调用的是parallel()

配置并行流使用的线程池
看看流的parallel方法,你可能会想,并行流用的线程是从哪儿来的?有多少个?怎么自定义这个过程呢?
并行流内部使用了默认的ForkJoinPool(7.2节会进一步讲到分支/合并框架),它默认的线程数量就是你的处理器数量,这个值是由 Runtime.getRuntime().availableProcessors()得到的。
但是你可以通过系统属性 java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池大小,如下所示:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则我们强烈建议你不要修改它。

7.1.2 测量流性能

这就说明了并行编程可能很复杂,有时候甚至有点违反直觉。如果用得不对(比如采用了一个不易并行化的操作,如iterate),它甚至可能让程序的整体性能更差,所以在调用那个看似神奇的parallel操作时,了解背后到底发生了什么是很有必要的。
使用更有针对性的方法
那到底要怎么利用多核处理器,用流来高效地并行求和呢?我们在第5章中讨论了一个叫LongStream.rangeClosed的方法。这个方法与iterate相比有两个优点。

  • LongStream.rangeClosed直接产生原始类型的long数字,没有装箱拆箱的开销。
  • LongStream.rangeClosed会生成数字范围,很容易拆分为独立的小块。例如,范围1~20可分为1~5、6~10、11~15和16~20

7.1.3 正确使用并行流

这一节举了一个开启并行踩坑的例子。forEach有坑

public class ParallelStreams 
    public static long sideEffectParallelSum(long n) 
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); // 开启并行流,数据竞争翻车
        return accumulator.total;
    

public class Accumulator 
    public long total = 0;
    public void add(long value) 
        total += value; // 非原子性操作
    

public static long measureSumPerf(Function<Long, Long> adder, long n) 
    long fastest = Long.MAX_VALUE;
    for (int i = 0; i < 10; i++) 
        long start = System.nanoTime();
        long sum = adder.apply(n);
        long duration = (System.nanoTime() - start) / 1_000_000;
        System.out.println("结果: " + sum);
        if (duration < fastest) fastest = duration;
    
    return fastest;

public static void main(String[] args) 
     System.out.println("并行流耗时: " +  measureSumPerf(ParallelStreams::sideEffectParallelSum, 10_000_000L) + "毫秒" );
 

7.1.4 高效使用并行流

一般而言,想给出任何关于什么时候该用并行流的定量建议都是不可能也毫无意义的,因为任何类似于“仅当至少有一千个(或一百万个或随便什么数字)元素的时候才用并行流)”的建议对于某台特定机器上的某个特定操作可能是对的,但在略有差异的另一种情况下可能就是大错特错。尽管如此,我们至少可以提出一些定性意见,帮你决定某个特定情况下是否有必要使用并行流。

  • 如果有疑问,测量。把顺序流转成并行流轻而易举,但却不一定是好事。我们在本节中已经指出,并行流并不总是比顺序流快。此外,并行流有时候会和你的直觉不一致,所以在考虑选择顺序流还是并行流时,第一个也是最重要的建议就是用适当的基准来检查其性能。
  • 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(IntStream、LongStream、DoubleStream)来避免这种操作,但凡有可能都应该用这些流。
  • 有些操作本身在并行流上的性能就比顺序流差。特别是limit和findFirst等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如,findAny会比findFirst性
    能好,因为它不一定要按顺序来执行。你总是可以调用unordered方法来把有序流变成无序流。那么,如果你需要流中的n个元素而不是专门要前n个的话,对无序并行流调用limit可能会比单个有序流(比如数据源是一个List)更高效。
  • 还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。
  • 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
  • 要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。另外,用range工厂方法创建的原始类型流也可以快速分解。最后,你将在7.3节中学到,你可以自己实现Spliterator来完全掌控分解过程。
  • 流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个SIZED流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。
  • 还要考虑终端操作中合并步骤的代价是大是小(例如Collector中的combiner方法)。
    如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通
    过并行流得到的性能提升。

表7-1按照可分解性总结了一些流数据源适不适于并行。
表7-1 流的数据源和可分解性

可分解性
ArrayList极佳
LinkedList
IntStream.range极佳
Stream.iterate
HashSet
TreeSet

7.2 分支/合并框架

7.2.1 使用 RecursiveTask

要把任务提交到这个池,必须创建RecursiveTask<R>的一个子类,其中R是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型(当然它可能会更新其他非局部机构)。要定义RecursiveTask,只需实现它唯一的抽象方法compute:protected abstract R compute();
这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。正由于此,这个方法的实现类似于下面的伪代码:

if (任务足够小或不可分)  
	顺序计算该任务 
 else  
	将任务分成两个子任务
	递归调用本方法,拆分每个子任务,等待所有子任务完成
	合并每个子任务的结果

一般来说并没有确切的标准决定一个任务是否应该再拆分,但有几种试探方法可以帮助你做出这一决定。我们会在7.2.1节中进一步澄清。递归的任务拆分过程如图7-3所示。

再举一个例子:为一个数字范围(这里用一个long[]数组表示)求和。如前所述,你需要先为RecursiveTask类做一个实现,就是下面代码清单中的ForkJoinSumCalculator

/**
 * 为一个数字范围(这里用一个long[]数组表示)求和
 * 继承 RecursiveTask 来创建可以用于分支/合并框架的任务
 */
public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask<Long> 
    private final long[] numbers;                   // 要求和的数组
    private final int start;                        // 子任务处理的数组的起始位置
    private final int end;                          // 子任务处理的数组的终止位置
    public static final long THRESHOLD = 10_000;    // 不再将任务分解为子任务的数组大小

    /**
     * 公共构造函数用于创建主任务
     */
    public ForkJoinSumCalculator(long[] numbers) 
        this(numbers, 0, numbers.length);
    

    /**
     * 私有构造函数用于以递归方式为主任务创建子任务
     */
    private ForkJoinSumCalculator(long[] numbers, int start, int end) 
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    

    /**
     * 覆盖 RecursiveTask 抽象方法
     */
    @Override
    protected Long compute() 
        // 该任务负责求和的部分的大小
        int length = end - start;
        if (length <= THRESHOLD) return computeSequentially();
        // 创建一个子任务来为数组的前一半求和
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
        // 利用另一个ForkJoinPool线程异步执行新创建的子任务
        leftTask.fork();
        // 创建一个任务为数组的后一半求和
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
        // 同步执行第二个子任务,有可能允许进一步递归划分
        Long rightResult = rightTask.compute();
        // 读取第一个子任务的结果,如果尚未完成就等待
        Long leftResult = leftTask.join();
        // 该任务的结果是两个子任务结果的组合
        return leftResult + rightResult;
    

    /**
     * 在子任务不再可分时计算结果的简单算法
     */
    private long computeSequentially() 
        long sum = 0;
        for (int i = start; i < end; i++) 
            
                sum += numbers[i];
            
        
        return sum;
    

测试代码:

public static void main(String[] args) 
    long n = 1_000_000;
    long[] numbers = LongStream.rangeClosed(1, n).toArray();
    ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
    Long sum = new ForkJoinPool().invoke(task);
    System.out.println("数组中所有元素的和为:" + sum); 
    // 数组中所有元素的和为:500000500000

实际应用中多个ForkJoinPool没有意义,所以请以单列模式来使用它。
ForkJoinPool默认构造函数会条用Runtime.availableProcessors获得内核的数量,包括超线程生成的虚拟内核。

运行ForkJoinSumCalculator
当把ForkJoinSumCalculator任务传给ForkJoinPool时,这个任务就由池中的一个线程执行,这个线程会调用任务的compute方法。该方法会检查任务是否小到足以顺序执行,如果不够小则会把要求和的数组分成两半,分给两个新的ForkJoinSumCalculator,而它们也由ForkJoinPool安排执行。因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步拆分的条件(本例中是求和的项目数小于等于10 000)。这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。这一过程如图7-4所示。

使用前面的测试代码跑一下发现比用并行流的版本要差,但这只是因为必须先要把整个数字流都放进一个long[],之后才能在ForkJoinSumCalculator任务中使用它。

7.2.2 使用分支/合并框架的最佳做法

虽然分支/合并框架还算简单易用,不幸的是它也很容易被误用。以下是几个有效使用它的最佳做法。

  • 对一个任务调用join方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。
  • 不应该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,你应该始终直接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。
  • 对子任务调用fork方法可以把它排进ForkJoinPool。同时对左边和右边的子任务调用它似乎很自然,但这样做的效率要比直接对其中一个调用compute低。这样做你可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。
  • 调试使用分支/合并框架的并行计算可能有点棘手。特别是你平常都在你喜欢的IDE里面看栈跟踪(stack trace)来找问题,但放在分支合并计算上就不行了,因为调用compute的线程并不是概念上的调用方,后者是调用fork的那个。
  • 和并行流一样,你不应理所当然地认为在多核处理器上使用分支/合并框架就比顺序计算快。我们已经说过,一个任务可以分解成多个独立的子任务,才能让性能在并行化时有所提升。所有这些子任务的运行时间都应该比分出新任务所花的时间长;一个惯用方法是把输入/输出放在一个子任务里,计算放在另一个里,这样计算就可以和输入/输出同时进行。此外,在比较同一算法的顺序和并行版本的性能时还有别的因素要考虑。就像任何其他Java代码一样,分支/合并框架需要“预热”或者说要执行几遍才会被JIT编译器优化。这就是为什么在测量性能之前跑几遍程序很重要,我们的测试框架就是这么做的。同时还要知道,编译器内置的优化可能会为顺序版本带来一些优势(例如执行死码分析——删去从未被使用的计算)。

对于分支/合并拆分策略还有最后一点补充:你必须选择一个标准,来决定任务是要进一步拆分还是已小到可以顺序求值。我们会在下一节中就此给出一些提示。

未完待续。。。

以上是关于《Java8实战》 - 读书笔记 - Parallel Stream并行流知识的主要内容,如果未能解决你的问题,请参考以下文章

《Java8实战》读书笔记11:Java8中新的日期时间API

《Java8实战》读书笔记11:Java8中新的日期时间API

《Java8实战》读书笔记08:接口的默认方法

《Java8实战》读书笔记12:函数式编程

《Java8实战》读书笔记12:函数式编程

《Java8实战》 - 读书笔记 - Stream流的基本用法