java8 Stream sorted()的一次调用链分析

Posted xiaoxi666

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java8 Stream sorted()的一次调用链分析相关的知识,希望对你有一定的参考价值。

代码

1 public static void main (String[] args) {
2     Stream.of("d2", "a2", "b1", "b3", "c")
3         .sorted((s1, s2) -> {
4             System.out.printf("sort: %s; %s
", s1, s2);
5             return s1.compareTo(s2);
6         })
7         .forEach(System.out::println);
8 }

执行结果

sort: a2; d2
sort: b1; a2
sort: b1; d2
sort: b1; a2
sort: b3; b1
sort: b3; d2
sort: c; b3
sort: c; d2
a2
b1
b3
c
d2

看到结果不淡定了,因此决定调试一下看看内部包装了哪种排序算法,这一调试不得了,发现stream的调用链有点奇怪:

以上这段代码利用了java8中的stream概念,在实际调试过程中,你会发现并不能从sorted()这里直接进入排序部分,由此引出本文。

分析

如果把上面代码中的 .forEach(System.out::println) 去掉,你会发现sorted()函数会被忽略因而根本不会执行,这就涉及到了stream的执行原理。

调用链记录为两大部分:

第一部分:sorted()

/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/src.zip!/java/util/stream/ReferencePipeline.java
  
public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
    return SortedOps.makeRef(this, comparator);
}
/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/src.zip!/java/util/stream/SortedOps.java

static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
                            Comparator<? super T> comparator) {
    return new OfRef<>(upstream, comparator);
}

OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
    super(upstream, StreamShape.REFERENCE,
          StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
    this.isNaturalSort = false;
    this.comparator = Objects.requireNonNull(comparator);
}

第二部分:foreach()

/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/src.zip!/java/util/stream/ReferencePipeline.java

public void forEach(Consumer<? super P_OUT> action) {
    evaluate(ForEachOps.makeRef(action, false));
}
/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/src.zip!/java/util/stream/AbstractPipeline.java

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/src.zip!/java/util/stream/ForEachOps.java

public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                   Spliterator<S> spliterator) {
    return helper.wrapAndCopyInto(this, spliterator).get();
}
/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/src.zip!/java/util/stream/AbstractPipeline.java
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);

    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}
/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/src.zip!/java/util/stream/SortedOps.java

public Sink<T> opWrapSink(int flags, Sink<T> sink) {
    Objects.requireNonNull(sink);

    // If the input is already naturally sorted and this operation
    // also naturally sorted then this is a no-op
    if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
        return sink;
    else if (StreamOpFlag.SIZED.isKnown(flags))
        return new SizedRefSortingSink<>(sink, comparator);
    else
        return new RefSortingSink<>(sink, comparator);
}
SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
    super(sink, comparator);
}
public void end() {
    Arrays.sort(array, 0, offset, comparator);
    downstream.begin(offset);
    if (!cancellationWasRequested) {
        for (int i = 0; i < offset; i++)
            downstream.accept(array[i]);
    }
    else {
        for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
            downstream.accept(array[i]);
    }
    downstream.end();
    array = null;
}
/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/src.zip!/java/util/Arrays.java

public static <T> void sort(T[] a, int fromIndex, int toIndex,
                            Comparator<? super T> c) {
    if (c == null) {
        sort(a, fromIndex, toIndex);
    } else {
        rangeCheck(a.length, fromIndex, toIndex);
        if (LegacyMergeSort.userRequested)
            legacyMergeSort(a, fromIndex, toIndex, c);
        else
            TimSort.sort(a, fromIndex, toIndex, c, null, 0, 0);
    }
}
/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/src.zip!/java/util/TimSort.java

static <T> void sort(T[] a, int lo, int hi, Comparator<? super T> c,
                     T[] work, int workBase, int workLen) {
    assert c != null && a != null && lo >= 0 && lo <= hi && hi <= a.length;

    int nRemaining  = hi - lo;
    if (nRemaining < 2)
        return;  // Arrays of size 0 and 1 are always sorted

    // If array is small, do a "mini-TimSort" with no merges
    if (nRemaining < MIN_MERGE) {
        int initRunLen = countRunAndMakeAscending(a, lo, hi, c);
        binarySort(a, lo, hi, lo + initRunLen, c);
        return;
    }

    /**
     * March over the array once, left to right, finding natural runs,
     * extending short natural runs to minRun elements, and merging runs
     * to maintain stack invariant.
     */
    TimSort<T> ts = new TimSort<>(a, c, work, workBase, workLen);
    int minRun = minRunLength(nRemaining);
    do {
        // Identify next run
        int runLen = countRunAndMakeAscending(a, lo, hi, c);

        // If run is short, extend to min(minRun, nRemaining)
        if (runLen < minRun) {
            int force = nRemaining <= minRun ? nRemaining : minRun;
            binarySort(a, lo, lo + force, lo + runLen, c);
            runLen = force;
        }

        // Push run onto pending-run stack, and maybe merge
        ts.pushRun(lo, runLen);
        ts.mergeCollapse();

        // Advance to find next run
        lo += runLen;
        nRemaining -= runLen;
    } while (nRemaining != 0);

    // Merge all remaining runs to complete sort
    assert lo == hi;
    ts.mergeForceCollapse();
    assert ts.stackSize == 1;
}

第3部分:compareTo()

这部分就不贴了,比较器

 

从以上调用链可以看出,sorted()之后首先进入foreach(),然后在foreach()中调用了Timsort()排序算法,最后又调用比较器;很明显,这个调用过程和程序顺序不一样。由于刚刚接触,就先把调用链记录下来,以后再补充深层原理。

 





以上是关于java8 Stream sorted()的一次调用链分析的主要内容,如果未能解决你的问题,请参考以下文章

java8 .stream().sorted().filter().map().collect()用法

JDK 8 之 Stream sorted() 示例

Java8 Stream 实现排序和分页

Java8 Stream 实现排序和分页

使用Java8 Stream API对Map按键或值进行排序

Java8利用stream流快速排序(包括中文)