13万字详细分析JDK中Stream的实现原理

Posted Java知音_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了13万字详细分析JDK中Stream的实现原理相关的知识,希望对你有一定的参考价值。

前提

StreamJDK1.8中首次引入的,距今已经过去了接近8年时间(JDK1.8正式版是2013年底发布的)。Stream的引入一方面极大地简化了某些开发场景,另一方面也可能降低了编码的可读性(确实有不少人说到Stream会降低代码的可读性,但是在笔者看来,熟练使用之后反而觉得代码的可读性提高了)。这篇文章会花巨量篇幅,详细分析Stream的底层实现原理,参考的源码是JDK11的源码,其他版本JDK可能不适用于本文中的源码展示和相关例子。

这篇文章花费了极多时间和精力梳理和编写,希望能够帮助到本文的读者

Stream是如何做到向前兼容的

StreamJDK1.8引入的,如要需要JDK1.7或者以前的代码也能在JDK1.8或以上运行,那么Stream的引入必定不能在原来已经发布的接口方法进行修改,否则必定会因为兼容性问题导致老版本的接口实现无法在新版本中运行(方法签名出现异常),猜测是基于这个问题引入了接口默认方法,也就是default关键字。查看源码可以发现,ArrayList的超类CollectionIterable分别添加了数个default方法:

// java.util.Collection部分源码
public interface Collection<E> extends Iterable<E> {

    // 省略其他代码

    @Override
    default Spliterator<E> spliterator() {
        return Spliterators.spliterator(this, 0);
    }

    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

    default Stream<E> parallelStream() {
        return StreamSupport.stream(spliterator(), true);
    }
}

// java.lang.Iterable部分源码
public interface Iterable<T> {

    // 省略其他代码

    default void forEach(Consumer<? super T> action) {
        Objects.requireNonNull(action);
        for (T t : this) {
            action.accept(t);
        }
    }

    default Spliterator<T> spliterator() {
        return Spliterators.spliteratorUnknownSize(iterator(), 0);
    }
}

从直觉来看,这些新增的方法应该就是Stream实现的关键方法(后面会印证这不是直觉,而是查看源码的结果)。接口默认方法在使用上和实例方法一致,在实现上可以直接在接口方法中编写方法体,有点静态方法的意味,但是子类可以覆盖其实现(也就是接口默认方法在本接口中的实现有点像静态方法,可以被子类覆盖,使用上和实例方法一致)。这种实现方式,有可能是一种突破,也有可能是一种妥协,但是无论是妥协还是突破,都实现了向前兼容:

// JDK1.7中的java.lang.Iterable
public interface Iterable<T> {

    Iterator<T> iterator();
}

// JDK1.7中的Iterable实现
public MyIterable<Long> implements Iterable<Long>{

    public Iterator<Long> iterator(){
         ....
    }
}

如上,MyIterableJDK1.7中定义,如果该类在JDK1.8中运行,那么调用其实例中的forEach()spliterator()方法,相当于直接调用JDK1.8中的Iterable中的接口默认方法forEach()spliterator()。当然受限于JDK版本,这里只能确保编译通过,旧功能正常使用,而无法在JDK1.7中使用Stream相关功能或者使用default方法关键字。总结这么多,就是想说明为什么使用JDK7开发和编译的代码可以在JDK8环境下运行。

可拆分迭代器Spliterator

Stream实现的基石是SpliteratorSpliteratorsplitable iterator的缩写,意为"可拆分迭代器",用于遍历指定数据源(例如数组、集合或者IO Channel等)中的元素,在设计上充分考虑了串行和并行的场景。上一节提到了Collection存在接口默认方法spliterator(),此方法会生成一个Spliterator<E>实例,意为着「所有的集合子类都具备创建Spliterator实例的能力」Stream的实现在设计上和Netty中的ChannelHandlerContext十分相似,本质是一个链表,「而Spliterator就是这个链表的Head节点」Spliterator实例就是一个流实例的头节点,后面分析具体的源码时候再具体展开)。

Spliterator接口方法

接着看Spliterator接口定义的方法:

public interface Spliterator<T> {

    // 暂时省略其他代码

    boolean tryAdvance(Consumer<? super T> action);

    default void forEachRemaining(Consumer<? super T> action) {
        do { } while (tryAdvance(action));
    }

    Spliterator<T> trySplit();

    long estimateSize();

    default long getExactSizeIfKnown() {
        return (characteristics() & SIZED) == 0 ? -1L : estimateSize();
    }

    int characteristics();

    default boolean hasCharacteristics(int characteristics) {
        return (characteristics() & characteristics) == characteristics;
    }

    default Comparator<? super T> getComparator() {
        throw new IllegalStateException();
    }

    // 暂时省略其他代码
}

「tryAdvance」

  • 方法签名:boolean tryAdvance(Consumer<? super T> action)

  • 功能:如果Spliterator中存在剩余元素,则对其中的某个元素执行传入的action回调,并且返回true,否则返回false。如果Spliterator启用了ORDERED特性,会按照顺序(这里的顺序值可以类比为ArrayList中容器数组元素的下标,ArrayList中添加新元素是天然有序的,下标由零开始递增)处理下一个元素

  • 例子:

public static void main(String[] args) throws Exception {
    List<Integer> list = new ArrayList<>();
    list.add(2);
    list.add(1);
    list.add(3);
    Spliterator<Integer> spliterator = list.stream().spliterator();
    final AtomicInteger round = new AtomicInteger(1);
    final AtomicInteger loop = new AtomicInteger(1);
    while (spliterator.tryAdvance(num -> System.out.printf("第%d轮回调Action,值:%d\\n", round.getAndIncrement(), num))) {
        System.out.printf("第%d轮循环\\n", loop.getAndIncrement());
    }
}

// 控制台输出
第1轮回调Action,值:2
第1轮循环
第2轮回调Action,值:1
第2轮循环
第3轮回调Action,值:3
第3轮循环

「forEachRemaining」

  • 方法签名:default void forEachRemaining(Consumer<? super T> action)

  • 功能:如果Spliterator中存在剩余元素,则对其中的「所有剩余元素」「当前线程中」执行传入的action回调。如果Spliterator启用了ORDERED特性,会按照顺序处理剩余所有元素。这是一个接口默认方法,方法体比较粗暴,直接是一个死循环包裹着tryAdvance()方法,直到false退出循环

  • 例子:

public static void main(String[] args) {
    List<Integer> list = new ArrayList<>();
    list.add(2);
    list.add(1);
    list.add(3);
    Spliterator<Integer> spliterator = list.stream().spliterator();
    final AtomicInteger round = new AtomicInteger(1);
    spliterator.forEachRemaining(num -> System.out.printf("第%d轮回调Action,值:%d\\n", round.getAndIncrement(), num));
}

// 控制台输出
第1轮回调Action,值:2
第2轮回调Action,值:1
第3轮回调Action,值:3

「rySplit」

  • 方法签名:Spliterator<T> trySplit()

  • 功能:如果当前的Spliterator是可分区(可分割)的,那么此方法将会返回一个全新的Spliterator实例,这个全新的Spliterator实例里面的元素不会被当前Spliterator实例中的元素覆盖(这里是直译了API注释,实际要表达的意思是:当前的Spliterator实例X是可分割的,trySplit()方法会分割X产生一个全新的Spliterator实例Y,原来的X所包含的元素(范围)也会收缩,类似于X = [a,b,c,d] => X = [a,b], Y = [c,d];如果当前的Spliterator实例X是不可分割的,此方法会返回NULL),「具体的分割算法由实现类决定」

  • 例子:

public static void main(String[] args) throws Exception {
    List<Integer> list = new ArrayList<>();
    list.add(2);
    list.add(3);
    list.add(4);
    list.add(1);
    Spliterator<Integer> first = list.stream().spliterator();
    Spliterator<Integer> second = first.trySplit();
    first.forEachRemaining(num -> {
        System.out.printf("first spliterator item: %d\\n", num);
    });
    second.forEachRemaining(num -> {
        System.out.printf("second spliterator item: %d\\n", num);
    });
}

// 控制台输出
first spliterator item: 4
first spliterator item: 1
second spliterator item: 2
second spliterator item: 3

「estimateSize」

  • 方法签名:long estimateSize()

  • 功能:返回forEachRemaining()方法需要遍历的元素总量的估计值,如果样本个数是无限、计算成本过高或者未知,会直接返回Long.MAX_VALUE

  • 例子:

public static void main(String[] args) throws Exception {
    List<Integer> list = new ArrayList<>();
    list.add(2);
    list.add(3);
    list.add(4);
    list.add(1);
    Spliterator<Integer> spliterator = list.stream().spliterator();
    System.out.println(spliterator.estimateSize());
}

// 控制台输出
4

「getExactSizeIfKnown」

  • 方法签名:default long getExactSizeIfKnown()

  • 功能:如果当前的Spliterator具备SIZED特性(关于特性,下文再展开分析),那么直接调用estimateSize()方法,否则返回-1

  • 例子:

public static void main(String[] args) throws Exception {
    List<Integer> list = new ArrayList<>();
    list.add(2);
    list.add(3);
    list.add(4);
    list.add(1);
    Spliterator<Integer> spliterator = list.stream().spliterator();
    System.out.println(spliterator.getExactSizeIfKnown());
}

// 控制台输出
4

「int characteristics()」

  • 方法签名:long estimateSize()

  • 功能:当前的Spliterator具备的特性(集合),采用位运算,存储在32位整数中(关于特性,下文再展开分析)

「hasCharacteristics」

  • 方法签名:default boolean hasCharacteristics(int characteristics)

  • 功能:判断当前的Spliterator是否具备传入的特性

「getComparator」

  • 方法签名:default Comparator<? super T> getComparator()

  • 功能:如果当前的Spliterator具备SORTED特性,则需要返回一个Comparator实例;如果Spliterator中的元素是天然有序(例如元素实现了Comparable接口),则返回NULL;其他情况直接抛出IllegalStateException异常

Spliterator自分割

Spliterator#trySplit()可以把一个既有的Spliterator实例分割为两个Spliterator实例,笔者这里把这种方式称为Spliterator自分割,示意图如下:

这里的分割在实现上可以采用两种方式:

  • 物理分割:对于ArrayList而言,把底层数组「拷贝」并且进行分割,用上面的例子来说相当于X = [1,3,4,2] => X = [4,2], Y = [1,3],这样实现加上对于ArrayList中本身的元素容器数组,相当于多存了一份数据,显然不是十分合理

  • 逻辑分割:对于ArrayList而言,由于元素容器数组天然有序,可以采用数组的索引(下标)进行分割,用上面的例子来说相当于X = 索引表[0,1,2,3] => X = 索引表[2,3], Y = 索引表[0,1],这种方式是共享底层容器数组,只对元素索引进行分割,实现上比较简单而且相对合理

参看ArrayListSpliterator的源码,可以分析其分割算法实现:

// ArrayList#spliterator()
public Spliterator<E> spliterator() {
    return new ArrayListSpliterator(0, -1, 0);
}

// ArrayList中内部类ArrayListSpliterator
final class ArrayListSpliterator implements Spliterator<E> {
    
    // 当前的处理的元素索引值,其实是剩余元素的下边界值(包含),在tryAdvance()或者trySplit()方法中被修改,一般初始值为0
    private int index;
    // 栅栏,其实是元素索引值的上边界值(不包含),一般初始化的时候为-1,使用时具体值为元素索引值上边界加1
    private int fence;
    // 预期的修改次数,一般初始化值等于modCount
    private int expectedModCount;

    ArrayListSpliterator(int origin, int fence, int expectedModCount) {
        this.index = origin;
        this.fence = fence;
        this.expectedModCount = expectedModCount;
    }
    
    // 获取元素索引值的上边界值,如果小于0,则把hi和fence都赋值为(ArrayList中的)size,expectedModCount赋值为(ArrayList中的)modCount,返回上边界值
    // 这里注意if条件中有赋值语句hi = fence,也就是此方法调用过程中临时变量hi总是重新赋值为fence,fence是ArrayListSpliterator实例中的成员属性
    private int getFence() {
        int hi;
        if ((hi = fence) < 0) {
            expectedModCount = modCount;
            hi = fence = size;
        }
        return hi;
    }
    
    // Spliterator自分割,这里采用了二分法
    public ArrayListSpliterator trySplit() {
        // hi等于当前ArrayListSpliterator实例中的fence变量,相当于获取剩余元素的上边界值
        // lo等于当前ArrayListSpliterator实例中的index变量,相当于获取剩余元素的下边界值
        // mid = (lo + hi) >>> 1,这里的无符号右移动1位运算相当于(lo + hi)/2
        int hi = getFence(), lo = index, mid = (lo + hi) >>> 1;
        // 当lo >= mid的时候为不可分割,返回NULL,否则,以index = lo,fence = mid和expectedModCount = expectedModCount创建一个新的ArrayListSpliterator
        // 这里有个细节之处,在新的ArrayListSpliterator构造参数中,当前的index被重新赋值为index = mid,这一点容易看漏,老程序员都喜欢做这样的赋值简化
        // lo >= mid返回NULL的时候,不会创建新的ArrayListSpliterator,也不会修改当前ArrayListSpliterator中的参数
        return (lo >= mid) ? null : new ArrayListSpliterator(lo, index = mid, expectedModCount);
    }
    
    // tryAdvance实现
    public boolean tryAdvance(Consumer<? super E> action) {
        if (action == null)
            throw new NullPointerException();
        // 获取迭代的上下边界
        int hi = getFence(), i = index;
        // 由于前面分析下边界是包含关系,上边界是非包含关系,所以这里要i < hi而不是i <= hi
        if (i < hi) {
            index = i + 1;
            // 这里的elementData来自ArrayList中,也就是前文经常提到的元素数组容器,这里是直接通过元素索引访问容器中的数据
            @SuppressWarnings("unchecked") E e = (E)elementData[i];
            // 对传入的Action进行回调
            action.accept(e);
            // 并发修改异常判断
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
            return true;
        }
        return false;
    }
    
    // forEachRemaining实现,这里没有采用默认实现,而是完全覆盖实现一个新方法
    public void forEachRemaining(Consumer<? super E> action) {
        // 这里会新建所需的中间变量,i为index的中间变量,hi为fence的中间变量,mc为expectedModCount的中间变量
        int i, hi, mc;
        Object[] a;
        if (action == null)
            throw new NullPointerException();
        // 判断容器数组存在性
        if ((a = elementData) != null) {
            // hi、fence和mc初始化
            if ((hi = fence) < 0) {
                mc = modCount;
                hi = size;
            }
            else
                mc = expectedModCount;
            // 这里就是先做参数合法性校验,再遍历临时数组容器a中中[i,hi)的剩余元素对传入的Action进行回调
            // 这里注意有一处隐蔽的赋值(index = hi),下界被赋值为上界,意味着每个ArrayListSpliterator实例只能调用一次forEachRemaining()方法
            if ((i = index) >= 0 && (index = hi) <= a.length) {
                for (; i < hi; ++i) {
                    @SuppressWarnings("unchecked") E e = (E) a[i];
                    action.accept(e);
                }
                // 这里校验ArrayList的modCount和mc是否一致,理论上在forEachRemaining()遍历期间,不能对数组容器进行元素的新增或者移除,一旦发生modCount更变会抛出异常
                if (modCount == mc)
                    return;
            }
        }
        throw new ConcurrentModificationException();
    }
    
    // 获取剩余元素估计值,就是用剩余元素索引上边界直接减去下边界
    public long estimateSize() {
        return getFence() - index;
    }
    
    // 具备ORDERED、SIZED和SUBSIZED特性
    public int characteristics() {
        return Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SUBSIZED;
    }
}

在阅读源码的时候务必注意,老一辈的程序员有时候会采用比较「隐蔽」的赋值方式,笔者认为需要展开一下:

第一处红圈位置在构建新的ArrayListSpliterator的时候,当前ArrayListSpliteratorindex属性也被修改了,过程如下图:

第二处红圈位置,在forEachRemaining()方法调用时候做参数校验,并且if分支里面把index(下边界值)赋值为hi(上边界值),「那么一个ArrayListSpliterator实例中的forEachRemaining()方法的遍历操作必定只会执行一次」。可以这样验证一下:

public static void main(String[] args) {
    List<Integer> list = new ArrayList<>();
    list.add(2);
    list.add(1);
    list.add(3);
    Spliterator<Integer> spliterator = list.stream().spliterator();
    final AtomicInteger round = new AtomicInteger(1);
    spliterator.forEachRemaining(num -> System.out.printf("[第一次遍历forEachRemaining]第%d轮回调Action,值:%d\\n", round.getAndIncrement(), num));
    round.set(1);
    spliterator.forEachRemaining(num -> System.out.printf("[第二次遍历forEachRemaining]第%d轮回调Action,值:%d\\n", round.getAndIncrement(), num));
}

// 控制台输出
[第一次遍历forEachRemaining]第1轮回调Action,值:2
[第一次遍历forEachRemaining]第2轮回调Action,值:1
[第一次遍历forEachRemaining]第3轮回调Action,值:3

对于ArrayListSpliterator的实现可以确认下面几点:

  • 一个新的ArrayListSpliterator实例中的forEachRemaining()方法只能调用一次

  • ArrayListSpliterator实例中的forEachRemaining()方法遍历元素的边界是[index, fence)

  • ArrayListSpliterator自分割的时候,分割出来的新ArrayListSpliterator负责处理元素下标小的分段(类比fork的左分支),而原ArrayListSpliterator负责处理元素下标大的分段(类比fork的右分支)

  • ArrayListSpliterator提供的estimateSize()方法得到的分段元素剩余数量是一个准确值

如果把上面的例子继续分割,可以得到下面的过程:

Spliterator自分割是并行流实现的基础」,并行流计算过程其实就是fork-join的处理过程,trySplit()方法的实现决定了fork任务的粒度,每个fork任务进行计算的时候是并发安全的,这一点由线程封闭(线程栈封闭)保证,每一个fork任务计算完成最后的结果再由单个线程进行join操作,才能得到正确的结果。下面的例子是求整数1 ~ 100的和:

public class ConcurrentSplitCalculateSum {

    private static class ForkTask extends Thread {

        private int result = 0;

        private final Spliterator<Integer> spliterator;
        private final CountDownLatch latch;

        public ForkTask(Spliterator<Integer> spliterator,
                        CountDownLatch latch) {
            this.spliterator = spliterator;
            this.latch = latch;
        }

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            spliterator.forEachRemaining(num -> result = result + num);
            long end = System.currentTimeMillis();
            System.out.printf("线程[%s]完成计算任务,当前段计算结果:%d,耗时:%d ms\\n",
                    Thread.currentThread().getName(), result, end - start);
            latch.countDown();
        }

        public int result() {
            return result;
        }
    }

    private static int join(List<ForkTask> tasks) {
        int result = 0;
        for (ForkTask task : tasks) {
            result = result + task.result();
        }
        return result;
    }

    private static final int THREAD_NUM = 4;

    public static void main(String[] args) throws Exception {
        List<Integer> source = new ArrayList<>();
        for (int i = 1; i < 101; i++) {
            source.add(i);
        }
        Spliterator<Integer> root = source.stream().spliterator();
        List<Spliterator<Integer>> spliteratorList = new ArrayList<>();
        Spliterator<Integer> x = root.trySplit();
        Spliterator<Integer> y = x.trySplit();
        Spliterator<Integer> z = root.trySplit();
        spliteratorList.add(root);
        spliteratorList.add(x);
        spliteratorList.add(y);
        spliteratorList.add(z);
        List<ForkTask> tasks = new ArrayList<>();
        CountDownLatch latch = new CountDownLatch(THREAD_NUM);
        for (int i = 0; i < THREAD_NUM; i++) {
            ForkTask task = new ForkTask(spliteratorList.get(i), latch);
            task.setName("fork-task-" + (i + 1));
            tasks.add(task);
        }
        tasks.forEach(Thread::start);
        latch.await();
        int result = join(tasks);
        System.out.println("最终计算结果为:" + result);
    }
}

// 控制台输出结果
线程[fork-task-4]完成计算任务,当前段计算结果:1575,耗时:0 ms
线程[fork-task-2]完成计算任务,当前段计算结果:950,耗时:1 ms
线程[fork-task-3]完成计算任务,当前段计算结果:325,耗时:1 ms
线程[fork-task-1]完成计算任务,当前段计算结果:2200,耗时:1 ms
最终计算结果为:5050

当然,最终并行流的计算用到了ForkJoinPool,并不像这个例子中这么粗暴地进行异步执行。关于并行流的实现下文会详细分析。

Spliterator支持的特性

某一个Spliterator实例支持的特性由方法characteristics()决定,这个方法返回的是一个32位数值,实际使用中会展开为bit数组,所有的特性分配在不同的位上,而hasCharacteristics(int characteristics)就是通过输入的具体特性值通过位运算判断该特性是否存在于characteristics()中。下面简化characteristicsbyte分析一下这个技巧:

假设:byte characteristics()  => 也就是最多8个位用于表示特性集合,如果每个位只表示一种特性,那么可以总共表示8种特性
特性X:0000 0001
特性Y:0000 0010
以此类推
假设:characteristics = X | Y = 0000 0001 | 0000 0010 = 0000 0011
那么:characteristics & X = 0000 0011 & 0000 0001 = 0000 0001
判断characteristics是否包含X:(characteristics & X) == X

上面推断的过程就是Spliterator中特性判断方法的处理逻辑:

// 返回特性集合
int characteristics();

// 基于位运算判断特性集合中是否存在输入的特性
default boolean hasCharacteristics(int characteristics) {
    return (characteristics() & characteristics) == characteristics;
}

这里可以验证一下:

public class CharacteristicsCheck {

    public static void main(String[] args) {
        System.out.printf("是否存在ORDERED特性:%s\\n", hasCharacteristics(Spliterator.ORDERED));
        System.out.printf("是否存在SIZED特性:%s\\n", hasCharacteristics(Spliterator.SIZED));
        System.out.printf("是否存在DISTINCT特性:%s\\n", hasCharacteristics(Spliterator.DISTINCT));
    }

    private static int characteristics() {
        return Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SORTED;
    }

    private static boolean hasCharacteristics(int characteristics) {
        return (characteristics() & characteristics) == characteristics;
    }
}

// 控制台输出
是否存在ORDERED特性:true
是否存在SIZED特性:true
是否存在DISTINCT特性:false

目前Spliterator支持的特性一共有8个,如下:

特性十六进制值二进制值功能
DISTINCT0x000000010000 0000 0000 0001去重,例如对于每对要处理的元素(x,y),使用!x.equals(y)比较,Spliterator中去重实际上基于Set处理
ORDERED0x000000100000 0000 0001 0000(元素)顺序处理,可以理解为trySplit()tryAdvance()forEachRemaining()方法对所有元素处理都保证一个严格的前缀顺序
SORTED0x000000040000 0000 0000 0100排序,元素使用getComparator()方法提供的Comparator进行排序,如果定义了SORTED特性,则必须定义ORDERED特性
SIZED0x000000400000 0000 0100 0000(元素)预估数量,启用此特性,那么Spliterator拆分或者迭代之前,estimateSize()返回的是元素的准确数量
NONNULL0x000000400000 0001 0000 0000(元素)非NULL,数据源保证Spliterator需要处理的元素不能为NULL,最常用于并发容器中的集合、队列和Map
IMMUTABLE0x000004000000 0100 0000 0000(元素)不可变,数据源不可被修改,也就是处理过程中元素不能被添加、替换和移除(更新属性是允许的)
CONCURRENT0x000010000001 0000 0000 0000(元素源)的修改是并发安全的,意味着多线程在数据源中添加、替换或者移除元素在不需要额外的同步条件下是并发安全的
SUBSIZED0x000040000100 0000 0000 0000(子Spliterator元素)预估数量,启用此特性,意味着通过trySplit()方法分割出来的所有子Spliterator(当前Spliterator分割后也属于子Spliterator)都启用SIZED特性

细心点观察可以发现:所有特性采用32位的整数存储,使用了隔1位存储的策略,位下标和特性的映射是:(0 => DISTINCT)、(3 => SORTED)、(5 => ORDERED)、(7=> SIZED)、(9 => NONNULL)、(11 => IMMUTABLE)、(13 => CONCURRENT)、(15 => SUBSIZED)

所有特性的功能这里只概括了核心的定义,还有一些小字或者特例描述限于篇幅没有完全加上,这一点可以参考具体的源码中的API注释。这些特性最终会转化为StreamOpFlag再提供给Stream中的操作判断使用,由于StreamOpFlag会更加复杂,下文再进行详细分析。

流的实现原理以及源码分析

由于流的实现是高度抽象的工程代码,所以在源码阅读上会有点困难。整个体系涉及到大量的接口、类和枚举,如下图:

图中的顶层类结构图描述的就是流的流水线相关类继承关系,其中IntStreamLongStreamDoubleStream都是特化类型,分别针对于IntegerLongDouble三种类型,其他引用类型构建的Pipeline都是ReferencePipeline实例,因此笔者认为,ReferencePipeline(引用类型流水线)是流的核心数据结构,下面会基于ReferencePipeline的实现做深入分析。

StreamOpFlag源码分析

注意,这一小节很烧脑,也有可能是笔者的位操作不怎么熟练,这篇文章大部分时间消耗在这一小节

StreamOpFlag是一个枚举,功能是存储Stream和操作的标志(Flags corresponding to characteristics of streams and operations,下称Stream标志),这些标志提供给Stream框架用于控制、定制化和优化计算。Stream标志可以用于描述与流相关联的若干不同实体的特征,这些实体包括:Stream的源、Stream的中间操作(Op)和Stream的终端操作(Terminal Op)。但是并非所有的Stream标志对所有的Stream实体都具备意义,目前这些实体和标志映射关系如下:

Type(Stream Entity Type)DISTINCTSORTEDORDEREDSIZEDSHORT_CIRCUIT
SPLITERATOR0101010100
STREAM0101010100
OP1111111001
TERMINAL_OP0000100001
UPSTREAM_TERMINAL_OP0000100000

其中:

  • 01:表示设置/注入

  • 10:表示清除

  • 11:表示保留

  • 00:表示初始化值(默认填充值),这是一个关键点,0值表示绝对不会是某个类型的标志

StreamOpFlag的顶部注释中还有一个表格如下:

-DISTINCTSORTEDORDEREDSIZEDSHORT_CIRCUIT
Stream source(Stream的源)YYYYN
Intermediate operation(中间操作)PCIPCIPCIPCPI
Terminal operation(终结操作)NNPCNPI

标记 -> 含义:

  • Y:允许

  • N:非法

  • P:保留

  • C:清除

  • I:注入

  • 组合PCI:可以保留、清除或者注入

  • 组合PC:可以保留或者清除

  • 组合PI:可以保留或者注入

两个表格其实是在描述同一个结论,可以相互对照和理解,但是「最终实现参照于第一个表的定义」。注意一点:这里的preserved(P)表示保留的意思,如果Stream实体某个标志被赋值为preserved,意味着该实体可以使用此标志代表的特性。例如此小节第一个表格中的OPDISTINCTSORTEDORDERED都赋值为11preserved),意味着OP类型的实体允许使用去重、自然排序和顺序处理特性。回到源码部分,先看StreamOpFlag的核心属性和构造器:

enum StreamOpFlag {

    // 暂时忽略其他代码

    // 类型枚举,Stream相关实体类型
    enum Type {
         
        // SPLITERATOR类型,关联所有和Spliterator相关的特性
        SPLITERATOR,

        // STREAM类型,关联所有和Stream相关的标志
        STREAM,

        // STREAM类型,关联所有和Stream中间操作相关的标志
        OP,

        // TERMINAL_OP类型,关联所有和Stream终结操作相关的标志
        TERMINAL_OP,

        // UPSTREAM_TERMINAL_OP类型,关联所有在最后一个有状态操作边界上游传播的终止操作标志
        // 这个类型的意义直译有点拗口,不过实际上在JDK11源码中,这个类型没有被流相关功能引用,暂时可以忽略
        UPSTREAM_TERMINAL_OP
    }

    // 设置/注入标志的bit模式,二进制数0001,十进制数1
    private static final int SET_BITS = 0b01;

    // 清除标志的bit模式,二进制数0010,十进制数2
    private static final int CLEAR_BITS = 0b10;

    // 保留标志的bit模式,二进制数0011,十进制数3
    private static final int PRESERVE_BITS = 0b11;
    
    // 掩码建造器工厂方法,注意这个方法用于实例化MaskBuilder
    private static MaskBuilder set(Type t) {
        return new MaskBuilder(new EnumMap<>(Type.class)).set(t);
    }
    
    // 私有静态内部类,掩码建造器,里面的map由上面的set(Type t)方法得知是EnumMap实例
    private static class MaskBuilder {
        // Type -> SET_BITS|CLEAR_BITS|PRESERVE_BITS|0
        final Map<Type, Integer> map;

        MaskBuilder(Map<Type, Integer> map) {
            this.map = map;
        }
       
        // 设置类型和对应的掩码
        MaskBuilder mask(Type t, Integer i) {
            map.put(t, i);
            return this;
        }
        
        // 对类型添加/inject
        MaskBuilder set(Type t) {
            return mask(t, SET_BITS);
        }

        MaskBuilder clear(Type t) {
            return mask(t, CLEAR_BITS);
        }

        MaskBuilder setAndClear(Type t) {
            return mask(t, PRESERVE_BITS);
        }
        
        // 这里的build方法对于类型中的NULL掩码填充为0,然后把map返回
        Map<Type, Integer> build() {
            for (Type t : Type.values()) {
                map.putIfAbsent(t, 0b00);
            }
            return map;
        }
    }
    
    // 类型->掩码映射
    private final Map<Type, Integer> maskTable;
    
    // bit的起始偏移量,控制下面set、clear和preserve的起始偏移量
    private final int bitPosition;

    // set/inject的bit set(map),其实准确来说应该是一个表示set/inject的bit map
    private final int set;

    // clear的bit set(map),其实准确来说应该是一个表示clear的bit map
    private final int clear;

    // preserve的bit set(map),其实准确来说应该是一个表示preserve的bit map
    private final int preserve;

    private StreamOpFlag(int position, MaskBuilder maskBuilder) {
        // 这里会基于MaskBuilder初始化内部的EnumMap
        this.maskTable = maskBuilder.build();
        // Two bits per flag <= 这里会把入参position放大一倍
        position *= 2;
        this.bitPosition = position;
        this.set = SET_BITS << position; // 设置/注入标志的bit模式左移2倍position
        this.clear = CLEAR_BITS << position; // 清除标志的bit模式左移2倍position
        this.preserve = PRESERVE_BITS << position; // 保留标志的bit模式左移2倍position
    }

    // 省略中间一些方法

    // 下面这些静态变量就是直接返回标志对应的set/injec、清除和保留的bit map
    /**
     * The bit value to set or inject {@link #DISTINCT}.
     */
    static final int IS_DISTINCT = DISTINCT.set;

    /**
     * The bit value to clear {@link #DISTINCT}.
     */
    static final int NOT_DISTINCT = DISTINCT.clear;

    /**
     * The bit value to set or inject {@link #SORTED}.
     */
    static final int IS_SORTED = SORTED.set;

    /**
     * The bit value to clear {@link #SORTED}.
     */
    static final int NOT_SORTED = SORTED.clear;

    /**
     * The bit value to set or inject {@link #ORDERED}.
     */
    static final int IS_ORDERED = ORDERED.set;

    /**
     * The bit value to clear {@link #ORDERED}.
     */
    static final int NOT_ORDERED = ORDERED.clear;

    /**
     * The bit value to set {@link #SIZED}.
     */
    static final int IS_SIZED = SIZED.set;

    /**
     * The bit value to clear {@link #SIZED}.
     */
    static final int NOT_SIZED = SIZED.clear;

    /**
     * The bit value to inject {@link #SHORT_CIRCUIT}.
     */
    static final int IS_SHORT_CIRCUIT = SHORT_CIRCUIT.set;
}

又因为StreamOpFlag是一个枚举,一个枚举成员是一个独立的标志,而一个标志会对多个Stream实体类型产生作用,所以它的一个成员描述的是上面实体和标志映射关系的一个列(竖着看):

// 纵向看
DISTINCT Flag:
maskTable: {
    SPLITERATOR:           0000 0001,
    STREAM:                0000 0001,
    OP:                    0000 0011,
    TERMINAL_OP:           0000 0000,
    UPSTREAM_TERMINAL_OP:  0000 0000
}
position(input): 0
bitPosition:     0
set:             1 => 0000 0000 0000 0000 0000 0000 0000 0001
clear:           2 => 0000 0000 0000 0000 0000 0000 0000 0010
preserve:        3 => 0000 0000 0000 0000 0000 0000 0000 0011

SORTED Flag:
maskTable: {
    SPLITERATOR:           0000 0001,
    STREAM:                0000 0001,
    OP:                    0000 0011,
    TERMINAL_OP:           0000 0000,
    UPSTREAM_TERMINAL_OP:  0000 0000
}
position(input): 1 
bitPosition:     2
set:             4 => 0000 0000 0000 0000 0000 0000 0000 0100
clear:           8 => 0000 0000 0000 0000 0000 0000 0000 1000
preserve:       12 => 0000 0000 0000 0000 0000 0000 0000 1100

ORDERED Flag:
maskTable: {
    SPLITERATOR:           0000 0001,
    STREAM:                0000 0001,
    OP:                    0000 0011,
    TERMINAL_OP:           0000 0010,
    UPSTREAM_TERMINAL_OP:  0000 0010
}
position(input): 2
bitPosition:     4 
set:            16 => 0000 0000 0000 0000 0000 0000 0001 0000
clear:          32 => 0000 0000 0000 0000 0000 0000 0010 0000
preserve:       48 => 0000 0000 0000 0000 0000 0000 0011 0000

SIZED Flag:
maskTable: {
    SPLITERATOR:           0000 0001,
    STREAM:                0000 0001,
    OP:                    0000 0010,
    TERMINAL_OP:           0000 0000,
    UPSTREAM_TERMINAL_OP:  0000 0000
}
position(input): 3
bitPosition:     6 
set:            64 => 0000 0000 0000 0000 0000 0000 0100 0000
clear:         128 => 0000 0000 0000 0000 0000 0000 1000 0000
preserve:      192 => 0000 0000 0000 0000 0000 0000 1100 0000

SHORT_CIRCUIT Flag:
maskTable: {
    SPLITERATOR:           0000 0000,
    STREAM:                0000 0000,
    OP:                    0000 0001,
    TERMINAL_OP:           0000 0001,
    UPSTREAM_TERMINAL_OP:  0000 0000
}
position(input): 12
bitPosition:     24 
set:       16777216 => 0000 0001 0000 0000 0000 0000 0000 0000
clear:     33554432 => 0000 0010 0000 0000 0000 0000 0000 0000   
preserve:  50331648 => 0000 0011 0000 0000 0000 0000 0000 0000

接着就用到按位与(&)和按位或(|)的操作,假设A = 0001B = 0010C = 1000,那么:

  • A|B = A | B = 0001 | 0010 = 0011(按位或,1|0=1, 0|1=1,0|0 =0,1|1=1

  • A&B = A & B = 0001 | 0010 = 0000(按位与,1|0=0, 0|1=0,0|0 =0,1|1=1

  • MASK = A | B | C = 0001 | 0010 | 1000 = 1011

  • 那么判断A|B是否包含A的条件为:A == (A|B & A)

  • 那么判断MASK是否包含A的条件为:A == MASK & A

这里把StreamOpFlag中的枚举套用进去分析:

static int DISTINCT_SET = 0b0001;
static int SORTED_CLEAR = 0b1000;

public static void main(String[] args) throws Exception {
    // 支持DISTINCT标志和不支持SORTED标志
    int flags = DISTINCT_SET | SORTED_CLEAR;
    System.out.println(Integer.toBinaryString(flags));
    System.out.printf("支持DISTINCT标志:%s\\n", DISTINCT_SET == (DISTINCT_SET & flags));
    System.out.printf("不支持SORTED标志:%s\\n", SORTED_CLEAR == (SORTED_CLEAR & flags));
}

// 控制台输出
1001
支持DISTINCT标志:true
不支持SORTED标志:true

由于StreamOpFlag的修饰符是默认,不能直接使用,可以把它的代码拷贝出来修改包名验证里面的功能:

public static void main(String[] args) {
    int flags = StreamOpFlag.DISTINCT.set | StreamOpFlag.SORTED.clear;
    System.out.println(StreamOpFlag.DISTINCT.set == (StreamOpFlag.DISTINCT.set & flags));
    System.out.println(StreamOpFlag.SORTED.clear == (StreamOpFlag.SORTED.clear & flags));
}

// 输出

true
true

下面这些方法就是基于这些运算特性而定义的:

enum StreamOpFlag {

    // 暂时忽略其他代码

    // 返回当前StreamOpFlag的set/inject的bit map
    int set() {
        return set;
    }

    // 返回当前StreamOpFlag的清除的bit map
    int clear() {
        return clear;
    }

    // 这里判断当前StreamOpFlag类型->标记映射中Stream类型的标记,如果大于0说明不是初始化状态,那么当前StreamOpFlag就是Stream相关的标志
    boolean isStreamFlag() {
        return maskTable.get(Type.STREAM) > 0;
    }

    // 这里就用到按位与判断输入的flags中是否设置当前StreamOpFlag(StreamOpFlag.set)
    boolean isKnown(int flags) {
        return (flags & preserve) == set;
    }

    // 这里就用到按位与判断输入的flags中是否清除当前StreamOpFlag(StreamOpFlag.clear)
    boolean isCleared(int flags) {
        return (flags & preserve) == clear;
    }

    // 这里就用到按位与判断输入的flags中是否保留当前StreamOpFlag(StreamOpFlag.clear)
    boolean isPreserved(int flags) {
        return (flags & preserve) == preserve;
    }

    // 判断当前的Stream实体类型是否可以设置本标志,要求Stream实体类型的标志位为set或者preserve,按位与要大于0
    boolean canSet(Type t) {
        return (maskTable.get(t) & SET_BITS) > 0;
    }

    // 暂时忽略其他代码
}

这里有个特殊操作,位运算的时候采用了(flags & preserve),理由是:同一个标志中的同一个Stream实体类型只可能存在set/injectclearpreserve的其中一种,也就是同一个flags中不可能同时存在StreamOpFlag.SORTED.setStreamOpFlag.SORTED.clear,从语义上已经矛盾,而set/injectclearpreservebit map中的大小(为2位)和位置已经是固定的,preserve在设计的时候为0b11刚好2位取反,因此可以特化为(这个特化也让判断更加严谨):

(flags & set) == set => (flags & preserve) == set
(flags & clear) == clear => (flags & preserve) == clear
(flags & preserve) == preserve => (flags & preserve) == preserve

分析这么多,总的来说,就是想通过一个32位整数,每2位分别表示3种状态,那么一个完整的Flags(标志集合)一共可以表示16种标志(position=[0,15],可以查看API注释,[4,11][13,15]的位置是未需实现或者预留的,属于gap)。接着分析掩码Mask的计算过程例子:

// 横向看(位移动运算符优先级高于与或,例如<<的优先级比|高)
SPLITERATOR_CHARACTERISTICS_MASK:
mask(init) = 0
mask(DISTINCT,SPLITERATOR[DISTINCT]=01,bitPosition=0) = 0000 0000 | 0000 0001 << 0 = 0000 0000 | 0000 0001 = 0000 0001
mask(SORTED,SPLITERATOR[SORTED]=01,bitPosition=2) = 0000 0001 | 0000 0001 << 2 = 0000 0001 | 0000 0100 = 0000 0101
mask(ORDERED,SPLITERATOR[ORDERED]=01,bitPosition=4) = 0000 0101 | 0000 0001 << 4 = 0000 0101 | 0001 0000 = 0001 0101
mask(SIZED,SPLITERATOR[SIZED]=01,bitPosition=6) = 0001 0101 | 0000 0001 << 6 = 0001 0101 | 0100 0000 = 0101 0101
mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=00,bitPosition=24) = 0101 0101 | 0000 0000 << 24 = 0101 0101 | 0000 0000 = 0101 0101
mask(final) = 0000 0000 0000 0000 0000 0000 0101 0101(二进制)、85(十进制)

STREAM_MASK:
mask(init) = 0
mask(DISTINCT,SPLITERATOR[DISTINCT]=01,bitPosition=0) = 0000 0000 | 0000 0001 << 0 = 0000 0000 | 0000 0001 = 0000 0001
mask(SORTED,SPLITERATOR[SORTED]=01,bitPosition=2) = 0000 0001 | 0000 0001 << 2 = 0000 0001 | 0000 0100 = 0000 0101
mask(ORDERED,SPLITERATOR[ORDERED]=01,bitPosition=4) = 0000 0101 | 0000 0001 << 4 = 0000 0101 | 0001 0000 = 0001 0101
mask(SIZED,SPLITERATOR[SIZED]=01,bitPosition=6) = 0001 0101 | 0000 0001 << 6 = 0001 0101 | 0100 0000 = 0101 0101
mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=00,bitPosition=24) = 0101 0101 | 0000 0000 << 24 = 0101 0101 | 0000 0000 = 0101 0101
mask(final) = 0000 0000 0000 0000 0000 0000 0101 0101(二进制)、85(十进制)

OP_MASK:
mask(init) = 0
mask(DISTINCT,SPLITERATOR[DISTINCT]=11,bitPosition=0) = 0000 0000 | 0000 0011 << 0 = 0000 0000 | 0000 0011 = 0000 0011
mask(SORTED,SPLITERATOR[SORTED]=11,bitPosition=2) = 0000 0011 | 0000 0011 << 2 = 0000 0011 | 0000 1100 = 0000 1111
mask(ORDERED,SPLITERATOR[ORDERED]=11,bitPosition=4) = 0000 1111 | 0000 0011 << 4 = 0000 1111 | 0011 0000 = 0011 1111
mask(SIZED,SPLITERATOR[SIZED]=10,bitPosition=6) = 0011 1111 | 0000 0010 << 6 = 0011 1111 | 1000 0000 = 1011 1111
mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=01,bitPosition=24) = 1011 1111 | 0000 0001 << 24 = 1011 1111 | 0100 0000 0000 0000 0000 0000 0000 = 0100 0000 0000 0000 0000 1011 1111
mask(final) = 0000 0000 1000 0000 0000 0000 1011 1111(二进制)、16777407(十进制)

TERMINAL_OP_MASK:
mask(init) = 0
mask(DISTINCT,SPLITERATOR[DISTINCT]=00,bitPosition=0) = 0000 0000 | 0000 0000 << 0 = 0000 0000 | 0000 0000 = 0000 0000
mask(SORTED,SPLITERATOR[SORTED]=00,bitPosition=2) = 0000 0000 | 0000 0000 << 2 = 0000 0000 | 0000 0000 = 0000 0000
mask(ORDERED,SPLITERATOR[ORDERED]=10,bitPosition=4) = 0000 0000 | 0000 0010 << 4 = 0000 0000 | 0010 0000 = 0010 0000
mask(SIZED,SPLITERATOR[SIZED]=00,bitPosition=6) = 0010 0000 | 0000 0000 << 6 = 0010 0000 | 0000 0000 = 0010 0000
mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=01,bitPosition=24) = 0010 0000 | 0000 0001 << 24 = 0010 0000 | 0001 0000 0000 0000 0000 0000 0000 = 0001 0000 0000 0000 0000 0010 0000
mask(final) = 0000 0001 0000 0000 0000 0000 0010 0000(二进制)、16777248(十进制)

UPSTREAM_TERMINAL_OP_MASK:
mask(init) = 0
mask(DISTINCT,SPLITERATOR[DISTINCT]=00,bitPosition=0) = 0000 0000 | 0000 0000 << 0 = 0000 0000 | 0000 0000 = 0000 0000
mask(SORTED,SPLITERATOR[SORTED]=00,bitPosition=2) = 0000 0000 | 0000 0000 << 2 = 0000 0000 | 0000 0000 = 0000 0000
mask(ORDERED,SPLITERATOR[ORDERED]=10,bitPosition=4) = 0000 0000 | 0000 0010 << 4 = 0000 0000 | 0010 0000 = 0010 0000
mask(SIZED,SPLITERATOR[SIZED]=00,bitPosition=6) = 0010 0000 | 0000 0000 << 6 = 0010 0000 | 0000 0000 = 0010 0000
mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=00,bitPosition=24) = 0010 0000 | 0000 0000 << 24 = 0010 0000 | 0000 0000 = 0010 0000
mask(final) = 0000 0000 0000 0000 0000 0000 0010 0000(二进制)、32(十进制)

相关的方法和属性如下:

enum StreamOpFlag {

    // SPLITERATOR类型的标志bit map
    static final int SPLITERATOR_CHARACTERISTICS_MASK = createMask(Type.SPLITERATOR);

    // STREAM类型的标志bit map
    static final int STREAM_MASK = createMask(Type.STREAM);

    // OP类型的标志bit map
    static final int OP_MASK = createMask(Type.OP);

    // TERMINAL_OP类型的标志bit map
    static final int TERMINAL_OP_MASK = createMask(Type.TERMINAL_OP);

    // UPSTREAM_TERMINAL_OP类型的标志bit map
    static final int UPSTREAM_TERMINAL_OP_MASK = createMask(Type.UPSTREAM_TERMINAL_OP);

    // 基于Stream类型,创建对应类型填充所有标志的bit map
    private static int createMask(Type t) {
        int mask = 0;
        for (StreamOpFlag flag : StreamOpFlag.values()) {
            mask |= flag.maskTable.get(t) << flag.bitPosition;
        }
        return mask;
    }

    // 构造一个标志本身的掩码,就是所有标志都采用保留位表示,目前作为flags == 0时候的初始值
    private static final int FLAG_MASK = createFlagMask();
    
    // 构造一个包含全部标志中的preserve位的bit map,按照目前来看是暂时是一个固定值,二进制表示为0011 0000 0000 0000 0000 1111 1111
    private static int createFlagMask() {
        int mask = 0;
        for (StreamOpFlag flag : StreamOpFlag.values()) {
            mask |= flag.preserve;
        }
        return mask;
    }

    // 构造一个Stream类型包含全部标志中的set位的bit map,这里直接使用了STREAM_MASK,按照目前来看是暂时是一个固定值,二进制表示为0000 0000 0000 0000 0000 0000 0101 0101
    private static final int FLAG_MASK_IS = STREAM_MASK;

    // 构造一个Stream类型包含全部标志中的clear位的bit map,按照目前来看是暂时是一个固定值,二进制表示为0000 0000 0000 0000 0000 0000 1010 1010
    private static final int FLAG_MASK_NOT = STREAM_MASK << 1;

    // 初始化操作的标志bit map,目前来看就是Stream的头节点初始化时候需要合并在flags里面的初始化值,照目前来看是暂时是一个固定值,二进制表示为0000 0000 0000 0000 0000 0000 1111 1111
    static final int INITIAL_OPS_VALUE = FLAG_MASK_IS | FLAG_MASK_NOT;
}

SPLITERATOR_CHARACTERISTICS_MASK5个成员(见上面的Mask计算例子)其实就是预先计算好对应的Stream实体类型的「所有StreamOpFlag标志」bit map&#x

以上是关于13万字详细分析JDK中Stream的实现原理的主要内容,如果未能解决你的问题,请参考以下文章

新年巨作!13万字!腾讯高工手写JDK源码笔记 带你飙向实战

13万字!腾讯高工手写JDK源码笔记 带你飙向实战

Redis Stream 流的深度解析与实现高级消息队列一万字

Redis Stream 流的深度解析与实现高级消息队列一万字

Redis Stream 流的深度解析与实现高级消息队列一万字

jdk动态代理和cglib动态代理底层实现原理超详细解析(jdk动态代理篇)