Java8 中的 Stream 那么强大,那你知道它的原理是什么吗?

Posted 逆锋起笔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java8 中的 Stream 那么强大,那你知道它的原理是什么吗?相关的知识,希望对你有一定的参考价值。

作者:岁月安然

elsef.com/2019/09/16/Java8中Stream的原理分析

Java 8 API添加了一个新的抽象称为流Stream,可以让你以一种声明的方式处理数据。
Stream 使用一种类似用 SQL 语句从数据库查询数据的直观方式来提供一种对 Java 集合运算和表达的高阶抽象。
Stream API可以极大提高Java程序员的生产力,让程序员写出高效率、干净、简洁的代码。
本文会对Stream的实现原理进行剖析。
Stream的组成与特点
Stream(流)是一个来自数据源的元素队列并支持聚合操作:
  • 元素是特定类型的对象,形成一个队列。 Java中的Stream并_不会_向集合那样存储和管理元素,而是按需计算

  • 数据源流的来源可以是集合Collection、数组ArrayI/O channel, 产生器generator 等

  • 聚合操作类似SQL语句一样的操作, 比如filtermapreducefindmatchsorted

  • 和以前的Collection操作不同, Stream操作还有两个基础的特征:
  • Pipelining: 中间操作都会返回流对象本身。这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。这样做可以对操作进行优化, 比如延迟执行(laziness evaluation)和短路( short-circuiting)

  • 内部迭代:以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式, 通过访问者模式 (Visitor)实现。

  • 和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。
    Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本如下:
    1.0-1.4 中的 java.lang.Thread
    5.0 中的 java.util.concurrent
    6.0 中的 Phasers 等
    7.0 中的 Fork/Join 框架
    8.0 中的 Lambda
    Stream具有平行处理能力,处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作:
    List<Integer> numbers = Arrays.asList(123456789);
    numbers.parallelStream()
           .forEach(out::println); 
    可以看到一行简单的代码就帮我们实现了并行输出集合中元素的功能,但是由于并行执行的顺序是不可控的所以每次执行的结果不一定相同。
    如果非得相同可以使用forEachOrdered方法执行终止操作:
    List<Integer> numbers = Arrays.asList(123456789);
    numbers.parallelStream()
           .forEachOrdered(out::println);  
    这里有一个疑问,如果结果需要有序,是否和我们的并行执行的初衷相悖?是的,这个场景下明显无需使用并行流,直接用串行流执行即可, 否则性能可能更差,因为最后又强行将所有并行结果进行了排序。
    OK,下面我们先介绍一下Stream接口的相关知识。
    BaseStream接口
    Stream的父接口是BaseStream,后者是所有流实现的顶层接口,定义如下:
    public interface BaseStream<TS extends BaseStream<TS>>
            extends AutoCloseable 

        Iterator<T> iterator();

        Spliterator<T> spliterator();

        boolean isParallel();

        sequential();

        parallel();

        unordered();

        onClose(Runnable closeHandler);

        void close();

    其中,T为流中元素的类型,S为一个BaseStream的实现类,它里面的元素也是T并且S同样是自己:
    S extends BaseStream<T, S>
    是不是有点晕?
    其实很好理解,我们看一下接口中对S的使用就知道了:如sequential()parallel()这两个方法,它们都返回了S实例,也就是说它们分别支持对当前流进行串行或者并行的操作,并返回「改变」后的流对象。
    如果是并行一定涉及到对当前流的拆分,即将一个流拆分成多个子流,子流肯定和父流的类型是一致的。子流可以继续拆分子流,一直拆分下去…
    也就是说这里的SBaseStream的一个实现类,它同样是一个流,比如StreamIntStreamLongStream等。
    Stream接口
    再来看一下Stream的接口声明:
    public interface Stream<T> extends BaseStream<T, Stream<T>> 
    参考上面的解释这里不难理解:即Stream<T>可以继续拆分为Stream<T>,我们可以通过它的一些方法来证实:
    Stream<T> filter(Predicate<? super T> predicate);
    <R> Stream<R> map(Function<? super T, ? extends R> mapper);
    <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
    Stream<T> sorted();
    Stream<T> peek(Consumer<? super T> action);
    Stream<T> limit(long maxSize);
    Stream<T> skip(long n);
    ...
    这些都是操作流的中间操作,它们的返回结果必须是流对象本身。
    关闭流操作
    BaseStream 实现了 AutoCloseable 接口,也就是 close() 方法会在流关闭时被调用。同时,BaseStream 中还给我们提供了onClose()方法:
    /** * Returns an equivalent stream with an additional close handler. Close * handlers are run when the @link #close() method * is called on the stream, and are executed in the order they were * added. All close handlers are run, even if earlier close handlers throw * exceptions. If any close handler throws an exception, the first * exception thrown will be relayed to the caller of @code close(), with * any remaining exceptions added to that exception as suppressed exceptions * (unless one of the remaining exceptions is the same exception as the * first exception, since an exception cannot suppress itself.) May * return itself. * * <p>This is an <a href="package-summary.html#StreamOps">intermediate * operation</a>. * * @param closeHandler A task to execute when the stream is closed * @return a stream with a handler that is run if the stream is closed */
    onClose(Runnable closeHandler);
    AutoCloseableclose()接口被调用的时候会触发调用流对象的onClose()方法,但有几点需要注意:
  • onClose() 方法会返回流对象本身,也就是说可以对改对象进行多次调用

  • 如果调用了多个onClose() 方法,它会按照调用的顺序触发,但是如果某个方法有异常则只会向上抛出第一个异常

  • 前一个 onClose() 方法抛出了异常不会影响后续 onClose() 方法的使用

  • 如果多个 onClose() 方法都抛出异常,只展示第一个异常的堆栈,而其他异常会被压缩,只展示部分信息

  • 并行流和串行流
    BaseStream接口中分别提供了并行流串行流两个方法,这两个方法可以任意调用若干次,也可以混合调用,但最终只会以最后一次方法调用的返回结果为准
    参考parallel()方法的说明:
    Returns an equivalent stream that is parallel. May return
    itself, either because the stream was already parallel, or because
    the underlying stream state was modified to be parallel.
    所以多次调用同样的方法并不会生成新的流,而是直接复用当前的流对象。
    下面的例子里以最后一次调用parallel()为准,最终是并行地计算sum
    stream.parallel()
       .filter(...)
       .sequential()
       .map(...)
       .parallel()
       .sum();
    ParallelStream背后的男人:ForkJoinPool
    ForkJoin框架是从JDK7中新特性,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService 接口。它使用了一个「无限队列」来保存需要执行的任务,而线程的数量则是通过构造函数传入, 如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值
    ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm) 来解决问题,典型的应用比如_快速排序算法_。这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成两个500 万的排序任务一个针对这两组500万数据的合并任务
    以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。
    问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行,想象一下归并排序的过程。
    所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法向 任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行
    那么使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?
    首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有「父子关系」的任务,比如使用4个线程来完成超过200万个任务。使用ThreadPoolExecutor 时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。
    Work Stealing原理:
    1. 每个工作线程都有自己的工作队列WorkQueue;
    2. 这是一个双端队列dequeue,它是线程私有的;
    3. ForkJoinTask中fork的子任务,将放入运行该任务的工作线程的队头,工作线程将以LIFO的顺序来处理工作队列中的任务,即堆栈的方式;
    4. 为了最大化地利用CPU,空闲的线程将从其它线程的队列中「窃取」任务来执行
    5. 但是是从工作队列的尾部窃取任务,以减少和队列所属线程之间的竞争;
    6. 双端队列的操作:push()/pop()仅在其所有者工作线程中调用,poll()是由其它线程窃取任务时调用的;
    7. 当只剩下最后一个任务时,还是会存在竞争,是通过CAS来实现的;
    用ForkJoinPool的眼光来看ParallelStream
    Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的CPU数量。当调用Arrays 类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。
    比如下面的代码用来遍历列表中的元素并执行需要的操作:
    List<UserInfo> userInfoList =
            DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());
    userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);
    对于列表中的元素的操作都会以并行的方式执行。forEach方法会为每个元素的计算操作创建一个任务,该任务会被前文中提到的ForkJoinPool中的commonPool处理。以上的并行计算逻辑当然也可以使用ThreadPoolExecutor完成,但是就代码的可读性和代码量而言,使用ForkJoinPool明显更胜一筹。
    对于ForkJoinPool通用线程池的线程数量,通常使用默认值就可以了,即运行时计算机的处理器数量。也可以通过设置系统属性:-Djava.util.concurrent .ForkJoinPool.common.parallelism=N (N为线程数量),来调整ForkJoinPool的线程数量。
    值得注意的是,当前执行的线程也会被用来执行任务,所以最终的线程个数为N+1,1就是当前的主线程
    这里就有一个问题,如果你在并行流的执行计算使用了_阻塞操作_,如I/O,那么很可能会导致一些问题:
    public static String query(String question) 
      List<String> engines = new ArrayList<String>();
      engines.add("http://www.google.com/?q=");
      engines.add("http://duckduckgo.com/?q=");
      engines.add("http://www.bing.com/search?q=");
       
      // get element as soon as it is available
      Optional<String> result = engines.stream().parallel().map((base) - 
        String url = base + question;
        // open connection and fetch the result
        return WS.url(url).get();
      ).findAny();
      return result.get();

    这个例子很典型,让我们来分析一下:
  • 这个并行流计算操作将由主线程和JVM默认的ForkJoinPool.commonPool()来共同执行。

  • map中是一个阻塞方法,需要通过访问HTTP接口并得到它的response,所以任何一个worker线程在执行到这里的时候都会阻塞并等待结果。

  • 所以当此时再其他地方通过并行流方式调用计算方法的时候,将会受到此处阻塞等待的方法的影响。

  • 目前的ForkJoinPool的实现并未考虑补偿等待那些阻塞在等待新生成的线程的工作worker线程,所以最终ForkJoinPool.commonPool()中的线程将备用光并且阻塞等待。

  • 正如我们上面那个列子的情况分析得知,lambda的执行并不是瞬间完成的,所有使用parallel streams的程序都有可能成为阻塞程序的源头, 并且在执行过程中程序中的其他部分将无法访问这些workers,这意味着任何依赖parallel streams的程序在什么别的东西占用着common ForkJoinPool时将会变得不可预知并且暗藏危机。
    小结:
    1. 当需要处理递归分治算法时,考虑使用ForkJoinPool。

    2. 仔细设置不再进行任务划分的阈值,这个阈值对性能有影响。

    3. Java 8中的一些特性会使用到ForkJoinPool中的通用线程池。在某些场合下,需要调整该线程池的默认的线程数量

    4. lambda应该尽量避免副作用,也就是说,避免突变基于堆的状态以及任何IO

    5. lambda应该互不干扰,也就是说避免修改数据源(因为这可能带来线程安全的问题)

    6. 避免访问在流操作生命周期内可能会改变的状态

    并行流的性能
    并行流框架的性能受以下因素影响:
  • 数据大小:数据够大,每个管道处理时间够长,并行才有意义;

  • 源数据结构:每个管道操作都是基于初始数据源,通常是集合,将不同的集合数据源分割会有一定消耗;

  • 装箱:处理基本类型比装箱类型要快;

  • 核的数量:默认情况下,核数量越多,底层fork/join线程池启动线程就越多;

  • 单元处理开销:花在流中每个元素身上的时间越长,并行操作带来的性能提升越明显;

  • 源数据结构分为以下3组:
  • 性能好:ArrayList、数组或IntStream.range(数据支持随机读取,能轻易地被任意分割)

  • 性能一般:HashSetTreeSet(数据不易公平地分解,大部分也是可以的)

  • 性能差:LinkedList(需要遍历链表,难以对半分解)、Stream.iterateBufferedReader.lines(长度未知,难以分解)

  • 注意:下面几个部分节选自:Streams 的幕后原理,顺便感谢一下作者_Brian Goetz_,写的太通透了。推荐:拥抱 Java 8 并行流吧,让执行速度飞起!
    NQ模型
    要确定并行性是否会带来提速,需要考虑的最后两个因素是:可用的数据量和针对每个数据元素执行的计算量。
    在我们最初的并行分解描述中,我们采用的概念是拆分来源,直到分段足够小,以致解决该分段上的问题的顺序方法更高效。分段大小必须依赖于所解决的问题,确切的讲,取决于每个元素完成的工作量。例如,计算一个字符串的长度涉及的工作比计算字符串的 SHA-1 哈希值要少得多。为每个元素完成的工作越多,“大到足够利用并行性” 的阈值就越低。类似地,拥有的数据越多, 拆分的分段就越多,而不会与 “太小” 阈值发生冲突。
    一个简单但有用的并行性能模型是 NQ 模型,其中 N 是数据元素数量,Q 是为每个元素执行的工作量。乘积 N*Q 越大,就越有可能获得并行提速。对于具有很小的 Q 的问题,比如对数字求和,您通常可能希望看到 N > 10,000 以获得提速;随着 Q 增加,获得提速所需的数据大小将会减小。
    并行化的许多阻碍(比如拆分成本、组合成本或遇到顺序敏感性)都可以通过 Q 更高的操作来缓解。尽管拆分某个 LinkedList 特征的结果可能很糟糕,但只要拥有足够大的 Q,仍然可能获得并行提速。
    遇到顺序
    遇到顺序指的是来源分发元素的顺序是否对计算至关重要。一些来源(比如基于哈希的集合和映射)没有有意义的遇到顺序。流标志 ORDERED 描述了流是否有有意义的遇到顺序。JDK 集合的 spliterator 会根据集合的规范来设置此标志;一些中间操作可能注入 ORDERED (sorted()) 或清除它 (unordered())。
    如果流没有遇到顺序,大部分流操作都必须遵守该顺序。对于顺序执行,会「自动保留遇到顺序」,因为元素会按遇到它们的顺序自然地处理。甚至在并行执行中,许多操作(无状态中间操作和一些终止操作(比如 reduce())),遵守遇到顺序不会产生任何实际成本。但对于其他操作(有状态中间操作,其语义与遇到顺序关联的终止操作,比如 findFirst() 或 forEachOrdered()), 在并行执行中遵守遇到顺序的责任可能很重大。如果流有一个已定义的遇到顺序,但该顺序对结果没有意义, 那么可以通过使用 unordered() 操作删除 ORDERED 标志,加速包含顺序敏感型操作的管道的顺序执行。
    作为对遇到顺序敏感的操作的示例,可以考虑 limit(),它会在指定大小处截断一个流。在顺序执行中实现 limit() 很简单:保留一个已看到多少元素的计数器,在这之后丢弃任何元素。但是在并行执行中,实现 limit() 要复杂得多;您需要保留前 N 个元素。此要求大大限制了利用并行性的能力;如果输入划分为多个部分,您只有在某个部分之前的所有部分都已完成后,才知道该部分的结果是否将包含在最终结果中。因此,该实现一般会错误地选择不使用所有可用的核心,或者缓存整个试验性结果,直到您达到目标长度。
    如果流没有遇到顺序,limit() 操作可以自由选择任何 N 个元素,这让执行效率变得高得多。知道元素后可立即将其发往下游, 无需任何缓存,而且线程之间唯一需要执行的协调是发送一个信号来确保未超出目标流长度。
    遇到顺序成本的另一个不太常见的示例是排序。如果遇到顺序有意义,那么 sorted() 操作会实现一种稳定 排序 (相同的元素按照它们进入输入时的相同顺序出现在输出中),而对于无序的流,稳定性(具有成本)不是必需的。 distinct() 具有类似的情况:如果流有一个遇到顺序,那么对于多个相同的输入元素,distinct() 必须发出其中的第一个, 而对于无序的流,它可以发出任何元素 — 同样可以获得高效得多的并行实现。
    在您使用 collect() 聚合时会遇到类似的情形。如果在无序流上执行 collect(groupingBy()) 操作, 与任何键对应的元素都必须按它们在输入中出现的顺序提供给下游收集器。此顺序对应用程序通常没有什么意义,而且任何顺序都没有意义。在这些情况下,可能最好选择一个并发 收集器(比如 groupingByConcurrent()),它可以忽略遇到顺序, 并让所有线程直接收集到一个共享的并发数据结构中(比如 ConcurrentHashMap),而不是让每个线程收集到它自己的中间映射中, 然后再合并中间映射(这可能产生很高的成本)。
    什么时候该使用并行流
    谈了这么多,关于并行流parallelStream的使用注意事项需要格外注意,它并不是解决性能的万金油,相反,如果使用不当会严重影响性能。我会在另外一篇文章里单独谈这个问题。

    逆锋起笔是一个专注于程序员圈子的技术平台,你可以收获最新技术动态最新内测资格BAT等大厂的经验精品学习资料职业路线副业思维,微信搜索逆锋起笔关注!

    References
  • http://movingon.cn/2017/05/02/jdk8-Stream-BaseStream-%E6%BA%90%E7%A0%81%E9%9A%BE%E7%82%B9%E6%B5%85%E6%9E%901/
  • https://www.jianshu.com/p/bd825cb89e00
  • https://jrebel.com/rebellabs/java-parallel-streams-are-bad-for-your-health/
  • https://blog.csdn.net/weixx3/article/details/81266552
  • https://www.ibm.com/developerworks/cn/java/j-java-streams-5-brian-goetz/index.html
  • https://www.ibm.com/developerworks/cn/java/j-java-streams-3-brian-goetz/index.html
  • https://juejin.im/post/5dc5a148f265da4d4f65c191
  • https://stackoverrun.com/cn/q/10341100
  • 推荐好文

    面试官:Java 反射是什么?

    Java 17:和遗留 25 年的漏洞 Say Goodbye

    Java 框架 Mybatis 插件开发指南,超详细!

    18 个 Java8 处理日期的新花样,肯定没用过!

    你还在 new 对象吗?Java8 通用 Builder 了解一下

    天天在用Stream,那你知道如此强大的Stream的实现原理吗?

    作者:CarpenterLee

    github.com/CarpenterLee/JavaLambdaInternals

    我们已经学会如何使用Stream API,用起来真的很爽,但简洁的方法下面似乎隐藏着无尽的秘密,如此强大的API是如何实现的呢?

    比如Pipeline是怎么执行的,每次方法调用都会导致一次迭代吗?自动并行又是怎么做到的,线程个数是多少?本节我们学习Stream流水线的原理,这是Stream实现的关键所在。

    首先回顾一下容器执行Lambda表达式的方式,以ArrayList.forEach()方法为例,具体代码如下:

    // ArrayList.forEach()
    public void forEach(Consumer<? super E> action) 
        ...
        for (int i=0; modCount == expectedModCount && i < size; i++) 
            action.accept(elementData[i]);// 回调方法
        
        ...
    
    

    我们看到ArrayList.forEach()方法的主要逻辑就是一个for循环,在该for循环里不断调用action.accept()回调方法完成对元素的遍历。

    这完全没有什么新奇之处,回调方法在Java GUI的监听器中广泛使用。Lambda表达式的作用就是相当于一个回调方法,这很好理解。

    Stream API中大量使用Lambda表达式作为回调方法,但这并不是关键。理解Stream我们更关心的是另外两个问题:流水线和自动并行。使用Stream或许很容易写入如下形式的代码:

    int longestStringLengthStartingWithA
            = strings.stream()
                  .filter(s -> s.startsWith("A"))
                  .mapToInt(String::length)
                  .max();
    

    上述代码求出以字母A开头的字符串的最大长度,一种直白的方式是为每一次函数调用都执一次迭代,这样做能够实现功能,但效率上肯定是无法接受的。

    类库的实现着使用流水线(Pipeline)的方式巧妙的避免了多次迭代,其基本思想是在一次迭代中尽可能多的执行用户指定的操作。为讲解方便我们汇总了Stream的所有操作。

    Stream操作分类
    中间操作(Intermediate operations)无状态(Stateless)unordered() filter() map() mapToInt() mapToLong() mapToDouble() flatMap() flatMapToInt() flatMapToLong() flatMapToDouble() peek()
    有状态(Stateful)distinct() sorted() sorted() limit() skip()
    结束操作(Terminal operations)非短路操作forEach() forEachOrdered() toArray() reduce() collect() max() min() count()
    短路操作(short-circuiting)anyMatch() allMatch() noneMatch() findFirst() findAny()

    Stream上的所有操作分为两类:中间操作和结束操作,中间操作只是一种标记,只有结束操作才会触发实际计算。中间操作又可以分为无状态的(Stateless)和有状态的(Stateful),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果。

    比如排序是有状态操作,在读取所有元素之前并不能确定排序结果;结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果,比如找到第一个满足条件的元素。之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。

    为了更好的理解流的中间操作和终端操作,可以通过下面的两段代码来看他们的执行过程。

    IntStream.range(1, 10)
       .peek(x -> System.out.print("\\nA" + x))
       .limit(3)
       .peek(x -> System.out.print("B" + x))
       .forEach(x -> System.out.print("C" + x));
    

    输出为:

    A1B1C1
    A2B2C2
    A3B3C3
    

    中间操作是懒惰的,也就是中间操作不会对数据做任何操作,直到遇到了最终操作。而最终操作,都是比较热情的。他们会往前回溯所有的中间操作。也就是当执行到最后的forEach操作的时候,它会回溯到它的上一步中间操作,上一步中间操作,又会回溯到上上一步的中间操作,...,直到最初的第一步。

    第一次forEach执行的时候,会回溯peek 操作,然后peek会回溯更上一步的limit操作,然后limit会回溯更上一步的peek操作,顶层没有操作了,开始自上向下开始执行,输出:A1B1C1第二次forEach执行的时候,然后会回溯peek 操作,然后peek会回溯更上一步的limit操作,然后limit会回溯更上一步的peek操作,顶层没有操作了,开始自上向下开始执行,输出:A2B2C2

    ...当第四次forEach执行的时候,然后会回溯peek 操作,然后peek会回溯更上一步的limit操作,到limit的时候,发现limit(3)这个job已经完成,这里就相当于循环里面的break操作,跳出来终止循环。

    再来看第二段代码:

    IntStream.range(1, 10)
       .peek(x -> System.out.print("\\nA" + x))
       .skip(6)
       .peek(x -> System.out.print("B" + x))
       .forEach(x -> System.out.print("C" + x));
    

    输出为:

    A1
    A2
    A3
    A4
    A5
    A6
    A7B7C7
    A8B8C8
    A9B9C9
    

    第一次forEach执行的时候,会回溯peek操作,然后peek会回溯更上一步的skip操作,skip回溯到上一步的peek操作,顶层没有操作了,开始自上向下开始执行,执行到skip的时候,因为执行到skip,这个操作的意思就是跳过,下面的都不要执行了,也就是就相当于循环里面的continue,结束本次循环。输出:A1

    第二次forEach执行的时候,会回溯peek操作,然后peek会回溯更上一步的skip操作,skip回溯到上一步的peek操作,顶层没有操作了,开始自上向下开始执行,执行到skip的时候,发现这是第二次skip,结束本次循环。输出:A2

    ...

    第七次forEach执行的时候,会回溯peek操作,然后peek会回溯更上一步的skip操作,skip回溯到上一步的peek操作,顶层没有操作了,开始自上向下开始执行,执行到skip的时候,发现这是第七次skip,已经大于6了,它已经执行完了skip(6)的job了。这次skip就直接跳过,继续执行下面的操作。输出:A7B7C7

    ...直到循环结束。

    一种直白的实现方式

    仍然考虑上述求最长字符串的程序,一种直白的流水线实现方式是为每一次函数调用都执一次迭代,并将处理中间结果放到某种数据结构中(比如数组,容器等)。

    具体说来,就是调用filter()方法后立即执行,选出所有以A开头的字符串并放到一个列表list1中,之后让list1传递给mapToInt()方法并立即执行,生成的结果放到list2中,最后遍历list2找出最大的数字作为最终结果。程序的执行流程如如所示:

    这样做实现起来非常简单直观,但有两个明显的弊端:

    1. 迭代次数多。迭代次数跟函数调用的次数相等。

    2. 频繁产生中间结果。每次函数调用都产生一次中间结果,存储开销无法接受。

    这些弊端使得效率底下,根本无法接受。如果不使用Stream API我们都知道上述代码该如何在一次迭代中完成,大致是如下形式:

    int longest = 0;
    for(String str : strings)
        if(str.startsWith("A"))// 1. filter(), 保留以A开头的字符串
            int len = str.length();// 2. mapToInt(), 转换成长度
            longest = Math.max(len, longest);// 3. max(), 保留最长的长度
        
    
    

    采用这种方式我们不但减少了迭代次数,也避免了存储中间结果,显然这就是流水线,因为我们把三个操作放在了一次迭代当中。只要我们事先知道用户意图,总是能够采用上述方式实现跟Stream API等价的功能,但问题是Stream类库的设计者并不知道用户的意图是什么。

    如何在无法假设用户行为的前提下实现流水线,是类库的设计者要考虑的问题。

    Stream流水线解决方案

    我们大致能够想到,应该采用某种方式记录用户每一步的操作,当用户调用结束操作时将之前记录的操作叠加到一起在一次迭代中全部执行掉。沿着这个思路,有几个问题需要解决:

    1. 用户的操作如何记录?

    2. 操作如何叠加?

    3. 叠加之后的操作如何执行?

    4. 执行后的结果(如果有)在哪里?

    >> 操作如何记录

    注意这里使用的是“操作(operation)”一词,指的是“Stream中间操作”的操作,很多Stream操作会需要一个回调函数(Lambda表达式),因此一个完整的操作是<数据来源,操作,回调函数>构成的三元组。

    Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的PipelineHelper来代表Stage,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。跟Stream相关类和接口的继承关系图示。

    还有IntPipeline, LongPipeline, DoublePipeline没在图中画出,这三个类专门为三种基本类型(不是包装类型)而定制的,跟ReferencePipeline是并列关系。

    图中Head用于表示第一个Stage,即调用调用诸如Collection.stream()方法产生的Stage,很显然这个Stage里不包含任何操作;StatelessOpStatefulOp分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操作。

    Stream流水线组织结构示意图如下:

    图中通过Collection.stream()方法得到Head也就是stage0,紧接着调用一系列的中间操作,不断产生新的Stream。这些Stream对象以双向链表的形式组织在一起,构成整个流水线,由于每个Stage都记录了前一个Stage和本次的操作以及回调函数,依靠这种结构就能建立起对数据源的所有操作。这就是Stream记录操作的方式。

    >> 操作如何叠加

    以上只是解决了操作记录的问题,要想让流水线起到应有的作用我们需要一种将所有操作叠加到一起的方案。你可能会觉得这很简单,只需要从流水线的head开始依次执行每一步的操作(包括回调函数)就行了。

    这听起来似乎是可行的,但是你忽略了前面的Stage并不知道后面Stage到底执行了哪种操作,以及回调函数是哪种形式。换句话说,只有当前Stage本身才知道该如何执行自己包含的动作。这就需要有某种协议来协调相邻Stage之间的调用关系。

    这种协议由Sink接口完成,Sink接口包含的方法如下表所示:

    方法名作用
    void begin(long size)开始遍历元素之前调用该方法,通知Sink做好准备。
    void end()所有元素遍历完成之后调用,通知Sink没有更多的元素了。
    boolean cancellationRequested()是否可以结束操作,可以让短路操作尽早结束。
    void accept(T t)遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。

    有了上面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的accept()方法即可,并不需要知道其内部是如何处理的。

    当然对于有状态的操作,Sink的begin()end()方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的Sink.begin()方法可能创建一个盛放结果的容器,而accept()方法负责将元素添加到该容器,最后end()负责对容器进行排序。

    对于短路操作,Sink.cancellationRequested()也是必须实现的,比如Stream.findFirst()是短路操作,只要找到一个元素,cancellationRequested()就应该返回true,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。

    实际上Stream API内部实现的的本质,就是如何重写Sink的这四个接口方法

    有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.begin(), accept(), cancellationRequested(), end()方法就可以了。一种可能的Sink.accept()方法流程是这样的:

    void accept(U u)
        1. 使用当前Sink包装的回调函数处理u
        2. 将处理结果传递给流水线下游的Sink
    
    

    Sink接口的其他几个方法也是按照这种[处理->转发]的模型实现。

    下面我们结合具体例子看看Stream的中间操作是如何将自身的操作包装成Sink以及Sink是如何将处理结果转发给下一个Sink的。先看Stream.map()方法:

    // Stream.map(),调用该方法将产生一个新的Stream
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) 
        ...
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) 
            @Override /*opWripSink()方法返回由回调函数包装而成Sink*/
            Sink<P_OUT> opWrapSink(int flags, Sink<R> downstream) 
                return new Sink.ChainedReference<P_OUT, R>(downstream) 
                    @Override
                    public void accept(P_OUT u) 
                        R r = mapper.apply(u);// 1. 使用当前Sink包装的回调函数mapper处理u
                        downstream.accept(r);// 2. 将处理结果传递给流水线下游的Sink
                    
                ;
            
        ;
    
    

    上述代码看似复杂,其实逻辑很简单,就是将回调函数mapper包装到一个Sink当中。由于Stream.map()是一个无状态的中间操作,所以map()方法返回了一个StatelessOp内部类对象(一个新的Stream),调用这个新Stream的opWripSink()方法将得到一个包装了当前回调函数的Sink。

    再来看一个复杂一点的例子。Stream.sorted()方法将对Stream中的元素进行排序,显然这是一个有状态的中间操作,因为读取所有元素之前是没法得到最终顺序的。抛开模板代码直接进入问题本质,sorted()方法是如何将操作封装成Sink的呢?sorted()一种可能封装的Sink代码如下:

    // Stream.sort()方法用到的Sink实现
    class RefSortingSink<T> extends AbstractRefSortingSink<T> 
        private ArrayList<T> list;// 存放用于排序的元素
        RefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) 
            super(downstream, comparator);
        
        @Override
        public void begin(long size) 
            ...
            // 创建一个存放排序元素的列表
            list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
        
        @Override
        public void end() 
            list.sort(comparator);// 只有元素全部接收之后才能开始排序
            downstream.begin(list.size());
            if (!cancellationWasRequested) // 下游Sink不包含短路操作
                list.forEach(downstream::accept);// 2. 将处理结果传递给流水线下游的Sink
            
            else // 下游Sink包含短路操作
                for (T t : list) // 每次都调用cancellationRequested()询问是否可以结束处理。
                    if (downstream.cancellationRequested()) break;
                    downstream.accept(t);// 2. 将处理结果传递给流水线下游的Sink
                
            
            downstream.end();
            list = null;
        
        @Override
        public void accept(T t) 
            list.add(t);// 1. 使用当前Sink包装动作处理t,只是简单的将元素添加到中间列表当中
        
    
    

    上述代码完美的展现了Sink的四个接口方法是如何协同工作的:

    1. 首先begin()方法告诉Sink参与排序的元素个数,方便确定中间结果容器的的大小;

    2. 之后通过accept()方法将元素添加到中间结果当中,最终执行时调用者会不断调用该方法,直到遍历所有元素;

    3. 最后end()方法告诉Sink所有元素遍历完毕,启动排序步骤,排序完成后将结果传递给下游的Sink;

    4. 如果下游的Sink是短路操作,将结果传递给下游时不断询问下游cancellationRequested()是否可以结束处理。

    >> 叠加之后的操作如何执行

    Sink完美封装了Stream每一步操作,并给出了[处理->转发]的模式来叠加操作。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。

    是什么启动这一连串的操作呢?也许你已经想到了启动的原始动力就是结束操作(Terminal Operation),一旦调用某个结束操作,就会触发整个流水线的执行。

    结束操作之后不能再有别的操作,所以结束操作不会创建新的流水线阶段(Stage),直观的说就是流水线的链表不会在往后延伸了。

    结束操作会创建一个包装了自己操作的Sink,这也是流水线中最后一个Sink,这个Sink只需要处理数据而不需要将结果传递给下游的Sink(因为没有下游)。对于Sink的[处理->转发]模型,结束操作的Sink就是调用链的出口。

    我们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在PipelineHelper中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段即可。

    但Stream类库的设计者没有这么做,而是设置了一个Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法来得到Sink,该方法的作用是返回一个新的包含了当前Stage代表的操作以及能够将结果传递给downstream的Sink对象。为什么要产生一个新对象而不是返回一个Sink字段?

    这是因为使用opWrapSink()可以将当前操作与下游Sink(上文中的downstream参数)结合成新Sink。试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的opWrapSink()方法直到最开始(不包括stage0,因为stage0代表数据源,不包含操作),就可以得到一个代表了流水线上所有操作的Sink,用代码表示就是这样:

    // AbstractPipeline.wrapSink()
    // 从下游向上游不断包装Sink。如果最初传入的sink代表结束操作,
    // 函数返回时就可以得到一个代表了流水线上所有操作的Sink。
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) 
        ...
        for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) 
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        
        return (Sink<P_IN>) sink;
    
    

    现在流水线上从开始到结束的所有的操作都被包装到了一个Sink里,执行这个Sink就相当于执行整个流水线,执行Sink的代码如下:

    // AbstractPipeline.copyInto(), 对spliterator代表的数据执行wrappedSink代表的操作。
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) 
        ...
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) 
            wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知开始遍历
            spliterator.forEachRemaining(wrappedSink);// 迭代
            wrappedSink.end();// 通知遍历结束
        
        ...
    
    

    上述代码首先调用wrappedSink.begin()方法告诉Sink数据即将到来,然后调用spliterator.forEachRemaining()方法对数据进行迭代,最后调用wrappedSink.end()方法通知Sink数据处理结束。逻辑如此清晰。

    >> 执行后的结果在哪里

    最后一个问题是流水线上所有操作都执行后,用户所需要的结果(如果有)在哪里?首先要说明的是不是所有的Stream结束操作都需要返回结果,有些操作只是为了使用其副作用(Side-effects),比如使用Stream.forEach()方法将结果打印出来就是常见的使用副作用的场景(事实上,除了打印之外其他场景都应避免使用副作用),对于真正需要返回结果的结束操作结果存在哪里呢?

    特别说明:副作用不应该被滥用,也许你会觉得在Stream.forEach()里进行元素收集是个不错的选择,就像下面代码中那样,但遗憾的是这样使用的正确性和效率都无法保证,因为Stream可能会并行执行。大多数使用副作用的地方都可以使用归约操作更安全和有效的完成。

    // 错误的收集方式
    ArrayList<String> results = new ArrayList<>();
    stream.filter(s -> pattern.matcher(s).matches())
          .forEach(s -> results.add(s));  // Unnecessary use of side-effects!
    // 正确的收集方式
    List<String>results =
         stream.filter(s -> pattern.matcher(s).matches())
                 .collect(Collectors.toList());  // No side-effects!
    

    回到流水线执行结果的问题上来,需要返回结果的流水线结果存在哪里呢?这要分不同的情况讨论,下表给出了各种有返回结果的Stream结束操作。

    返回类型对应的结束操作
    booleananyMatch() allMatch() noneMatch()
    OptionalfindFirst() findAny()
    归约结果reduce() collect()
    数组toArray()
    1. 对于表中返回boolean或者Optional的操作(Optional是存放 一个 值的容器)的操作,由于值返回一个值,只需要在对应的Sink中记录这个值,等到执行结束时返回就可以了。

    2. 对于归约操作,最终结果放在用户调用时指定的容器中(容器类型通过收集器指定)。collect(), reduce(), max(), min()都是归约操作,虽然max()和min()也是返回一个Optional,但事实上底层是通过调用reduce()方法实现的。

    3. 对于返回是数组的情况,毫无疑问的结果会放在数组当中。这么说当然是对的,但在最终返回数组之前,结果其实是存储在一种叫做Node的数据结构中的。Node是一种多叉树结构,元素存储在树的叶子当中,并且一个叶子节点可以存放多个元素。这样做是为了并行执行方便。关于Node的具体结构,我们会在下一节探究Stream如何并行执行时给出详细说明。

    结语

    本文详细介绍了Stream流水线的组织方式和执行过程,学习本文将有助于理解原理并写出正确的Stream代码,同时打消你对Stream API效率方面的顾虑。如你所见,Stream API实现如此巧妙,即使我们使用外部迭代手动编写等价代码,也未必更加高效。

    注:留下本文所用的JDK版本,以便有考究癖的人考证:

    $ java -version
    java version "1.8.0_101"
    Java(TM) SE Runtime Environment (build 1.8.0_101-b13)
    Java HotSpot(TM) Server VM (build 25.101-b13, mixed mode)
    
     
    

    以上是关于Java8 中的 Stream 那么强大,那你知道它的原理是什么吗?的主要内容,如果未能解决你的问题,请参考以下文章

    天天在用Stream,那你知道如此强大的Stream的实现原理吗?

    天天在用Stream,那你知道如此强大的Stream的实现原理吗?

    Java 8 的Stream流那么强大!

    精细篇Java8强大的stream API接口大全(代码优雅之道)

    Java8-04-笔记

    Java8-04-笔记