takeWhile() 与平面图的工作方式不同

Posted

技术标签:

【中文标题】takeWhile() 与平面图的工作方式不同【英文标题】:takeWhile() working differently with flatmap 【发布时间】:2018-06-01 23:55:59 【问题描述】:

我正在用 takeWhile 创建 sn-ps 来探索它的可能性。与 flatMap 配合使用时,行为不符合预期。请在下面找到代码 sn-p。

String[][] strArray = "Sample1", "Sample2", "Sample3", "Sample4", "Sample5";

Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

实际输出:

Sample1
Sample2
Sample3
Sample5

预期输出:

Sample1
Sample2
Sample3

期望的原因是 takeWhile 应该一直执行到内部条件变为真为止。我还在 flatmap 中添加了打印输出语句以进行调试。流仅返回两次,符合预期。

但是,如果链中没有平面图,这也可以正常工作。

String[] strArraySingle = "Sample3", "Sample4", "Sample5";
Arrays.stream(strArraySingle)
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
        .forEach(ele -> System.out.println(ele));

实际输出:

Sample3

此处实际输出与预期输出匹配。

免责声明:这些 sn-ps 仅用于代码练习,不提供任何有效的用例。

更新: 错误JDK-8193856:修复将作为 JDK 10 的一部分提供。 更改将更正whileOps 接收器::接受

@Override 
public void accept(T t) 
    if (take = predicate.test(t)) 
        downstream.accept(t);
    

改变的实现:

@Override
public void accept(T t) 
    if (take && (take = predicate.test(t))) 
        downstream.accept(t);
    

【问题讨论】:

【参考方案1】:

这是 JDK 9 中的一个错误 - 来自 issue #8193856:

takeWhile 错误地假设上游操作支持并尊重取消,不幸的是flatMap 并非如此。

说明

如果流是有序的,takeWhile 应该会显示预期的行为。在您的代码中并非完全如此,因为您使用了forEach,它放弃了订单。如果你关心它,你在这个例子中做了,你应该使用forEachOrdered。有趣的是:这并没有改变任何事情。 ?

所以也许一开始就没有对流进行排序? (在那种情况下the behavior is ok。)如果你为从strArray创建的流创建一个临时变量,并通过在断点处执行表达式((StatefulOp) stream).isOrdered();来检查它是否有序,你会发现它确实是有序的:

String[][] strArray = "Sample1", "Sample2", "Sample3", "Sample4", "Sample5";

Stream<String> stream = Arrays.stream(strArray)
        .flatMap(indStream -> Arrays.stream(indStream))
        .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));

// breakpoint here
System.out.println(stream);

这意味着这很可能是一个实现错误。

进入代码

正如其他人所怀疑的,我现在也认为这可能flatMap 的渴望有关。更准确地说,这两个问题可能具有相同的根本原因。

查看WhileOps的来源,我们可以看到这些方法:

@Override
public void accept(T t) 
    if (take = predicate.test(t)) 
        downstream.accept(t);
    


@Override
public boolean cancellationRequested() 
    return !take || downstream.cancellationRequested();

takeWhile 使用此代码检查给定的流元素 t 是否满足 predicate

如果是这样,它将元素传递给downstream 操作,在本例中为System.out::println。 如果不是,则将take设置为false,因此下次询问是否应该取消管道(即已完成)时,返回true

这涵盖了takeWhile 操作。你需要知道的另一件事是forEachOrdered导致终端操作执行ReferencePipeline::forEachWithCancel方法:

@Override
final boolean forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) 
    boolean cancelled;
    do   while (
            !(cancelled = sink.cancellationRequested())
            && spliterator.tryAdvance(sink));
    return cancelled;

所有这些都是:

    检查管道是否被取消 如果没有,则将接收器前移一个元素 如果这是最后一个元素则停止

看起来很有希望,对吧?

没有flatMap

在“好的情况下”(没有flatMap;您的第二个示例)forEachWithCancel 直接在WhileOp 上操作为sink,您可以看到这是如何进行的:

ReferencePipeline::forEachWithCancel 循环: WhileOps::accept 被赋予每个流元素 在每个元素之后查询WhileOps::cancellationRequested 在某些时候"Sample4" 使谓词失败并且流被取消

耶!

flatMap

在“坏情况”(flatMap;您的第一个示例)中,forEachWithCancelflatMap 操作进行操作,不过,它只是在 ArraySpliterator 上调用 forEachRemaining 以获取 "Sample3", "Sample4", "Sample5",这是做什么的:

if ((a = array).length >= (hi = fence) &&
    (i = index) >= 0 && i < (index = hi)) 
    do  action.accept((T)a[i]);  while (++i < hi);

忽略所有hifence 的东西,它们仅在数组处理被拆分为并行流时使用,这是一个简单的for 循环,它将每个元素传递给takeWhile 操作, 但从不检查是否已取消。因此,它会在停止之前急切地遍历该“子流”中的所有元素,甚至可能是through the rest of the stream。

【讨论】:

@Eugene:好吧,我敢打赌它连接到this one。它恰好适用于终端短路操作,因为它们忽略了多余的元素,但现在我们有中间短路操作......所以这实际上是个好消息,因为它意味着现在修复该错误有更多的压力(糟糕的性能或当子流无限时中断显然是不够的)...... 它不会遍历整个流。如果子流的最后一个元素与谓词匹配,则外部流的取消支持将起作用,例如使用String[][] strArray = "Sample1", "Sample2", "Sample3", "Sample4", "Sample5", "Sample6", ; 作为输入,它似乎工作。如果只有中间元素匹配,flatMap 对取消的无知会导致标志被后续元素的评估覆盖。 @Holger 我的意思只是“子流”(从我的措辞中不清楚),甚至没有考虑关注“子流”。更改了措辞并链接到您的澄清评论。 看来,他们听到了你的声音:bugs.openjdk.java.net/browse/JDK-8193856【参考方案2】:

无论我怎么看,这都是一个错误 - 感谢 Holger 提供的 cmets。我不想把这个答案放在这里(说真的!),但没有一个答案明确指出这是一个错误。

人们说这与有序/无序有关,但事实并非如此,因为这将报告 true 3 次:

Stream<String[]> s1 = Arrays.stream(strArray);
System.out.println(s1.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s2 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream));
System.out.println(s2.spliterator().hasCharacteristics(Spliterator.ORDERED));

Stream<String> s3 = Arrays.stream(strArray)
            .flatMap(indStream -> Arrays.stream(indStream))
            .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"));
System.out.println(s3.spliterator().hasCharacteristics(Spliterator.ORDERED));

如果你把它改成这样也很有趣:

String[][] strArray =  
          "Sample1", "Sample2" , 
          "Sample3", "Sample5", "Sample4" , // Sample4 is the last one here
          "Sample7", "Sample8"  
;

那么Sample7Sample8 将不会成为输出的一部分,否则它们会。似乎flatmap 忽略 dropWhile 将引入的取消标志。

【讨论】:

【参考方案3】:

如果你看the documentation for takeWhile

如果这个流是有序的,[返回]一个由 从此流中获取的与给定匹配的元素的最长前缀 谓词。

如果这个流是无序的,[返回]一个由一个子集组成的流 从此流中获取的与给定谓词匹配的元素。

您的流是巧合排序的,但takeWhile 不知道它是。因此,它返回第二个条件 - 子集。你的takeWhile 就像filter

如果您在takeWhile 之前添加对sorted 的调用,您将看到预期的结果:

Arrays.stream(strArray)
      .flatMap(indStream -> Arrays.stream(indStream))
      .sorted()
      .takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
      .forEach(ele -> System.out.println(ele));

【讨论】:

为什么不排序,或者为什么不知道是?有序流的“连接”应该是有序的,不是吗? @JBNizet 但是如果你对每个步骤采取每个单独的步骤Stream&lt;String[]&gt; s1 = Arrays.stream(strArray); System.out.println(s1.spliterator().hasCharacteristics(Split‌​erator.ORDERED)) 等等 - 它们都会产生一个ORDERED 流,这看起来像一个尚未报告的错误跨度> @Michael 如我所见(根据之前的评论)-您的结论对我来说是错误的 但是takeWhile不知道它是”……好吧为什么它不知道它什么时候流和它的子流有序的,为什么.sorted().unordered() .takeWhile(…) 仍然在做正确的事情呢?我想说,这是因为 sorted 是一个有状态的操作,它缓冲整个输入,然后是一个真正的惰性迭代。 “您的流是巧合排序的,但 takeWhile 不知道它是。因此,它返回第二个条件 - 子集。您的 takeWhile 就像一个过滤器。”:但这听起来真的不对。如果流未排序,它将以某种不可预测的顺序返回其元素。现在,takeWhile 应该按照接收到的顺序作用于它实际接收到的元素,并在元素不满足其谓词时立即停止。如果想过滤无序的流,他们应该使用filter【参考方案4】:

原因是flatMap 操作也是一个intermediate operations,其中一个有状态短路中间操作 takeWhile 是用过。

Holger 在this answer 中指出的flatMap 的行为当然是一个参考,人们不应错过了解此类短路操作的意外输出。

您的预期结果可以通过拆分这两个中间操作来实现,方法是引入终端操作以进一步确定性地使用有序流并将它们作为样本执行:

List<String> sampleList = Arrays.stream(strArray).flatMap(Arrays::stream).collect(Collectors.toList());
sampleList.stream().takeWhile(ele -> !ele.equalsIgnoreCase("Sample4"))
            .forEach(System.out::println);

另外,似乎有一个相关的Bug#JDK-8075939 来跟踪已经注册的这种行为。

编辑:这可以在JDK-8193856 被接受为错误进一步跟踪。

【讨论】:

我不明白你的解释。对我来说,这种行为似乎是一个错误。而且您建议的替代方案需要两个 Stream 管道,这可能不太理想。 @Eran 实际上,这种行为似乎是一个错误。建议的替代方案只是引入一个终端操作来完成(耗尽)flatMap 操作,然后处理流以执行takeWhile

以上是关于takeWhile() 与平面图的工作方式不同的主要内容,如果未能解决你的问题,请参考以下文章

Haskell中无限列表的执行部分?

为啥 &= 运算符与 && 的工作方式不同

原子操作在进程间的工作方式是不是与它们在线程间的工作方式相同?

Beautiful Soup 4 CSS 选择器的工作方式与教程显示的方式不同

TOP 函数的工作方式与 mysql 中的 LIMIT 不同吗?

断点的工作方式与示例中的不同