Java流操作融合与有状态中间操作

Posted

技术标签:

【中文标题】Java流操作融合与有状态中间操作【英文标题】:Java stream operation fusion and stateful intermediate operations 【发布时间】:2016-05-06 06:36:28 【问题描述】:

我一直在尝试了解和展示 Java 流如何在底层实现一种循环融合,以便可以将多个操作融合到一次传递中。

这里的第一个例子:

Stream.of("The", "cat", "sat", "on", "the", "mat")
        .filter(w -> 
            System.out.println("Filtering: " + w);
            return w.length() == 3;
        )
        .map(w -> 
            System.out.println("Mapping: " + w);
            return w.toUpperCase();
        )
        .forEach(w -> System.out.println("Printing: " + w));

具有以下输出(每个元素的单遍融合非常清晰):

Filtering: The
Mapping: The
Printing: THE
Filtering: cat
Mapping: cat
Printing: CAT
Filtering: sat
Mapping: sat
Printing: SAT
Filtering: on
Filtering: the
Mapping: the
Printing: THE
Filtering: mat
Mapping: mat
Printing: MAT

第二个例子是一样的,但是我在过滤器和映射之间使用了 sorted() 操作:

Stream.of("The", "cat", "sat", "on", "the", "mat")
        .filter(w -> 
            System.out.println("Filtering: " + w);
            return w.length() == 3;
        )
        .sorted()
        .map(w -> 
            System.out.println("Mapping: " + w);
            return w.toUpperCase();
        )
        .forEach(w -> System.out.println("Printing: " + w));

这有以下输出:

Filtering: The
Filtering: cat
Filtering: sat
Filtering: on
Filtering: the
Filtering: mat
Mapping: The
Printing: THE
Mapping: cat
Printing: CAT
Mapping: mat
Printing: MAT
Mapping: sat
Printing: SAT
Mapping: the
Printing: THE

所以我的问题在这里,通过调用 distinct,我是否正确地认为因为它是一个“有状态”的中间操作,它不允许在单次通过(所有操作)期间单独处理单个元素.此外,由于 sorted() 有状态操作需要处理整个输入流以产生结果,因此此处无法部署融合技术,这就是为什么首先发生所有过滤,然后将映射和打印操作融合在一起,排序后?如果我的任何假设不正确,请纠正我,并随时详细说明我已经说过的内容。

此外,它如何在后台决定是否可以将元素融合在一起成为一次传递,例如,当 distinct() 操作存在时,是否有一个标志可以关闭以阻止它发生就像 distinct() 不存在时一样?

最后一个查询是,虽然将操作融合到单次通过的好处有时是显而易见的,例如,当与短路结合使用时。将 filter-map-forEach 甚至 filter-map-sum 等操作融合在一起的主要好处是什么?

【问题讨论】:

您可以通过调试示例来简单地回答您的问题 我猜,每次你写“distinct”时,你的意思是“sorted”…… 【参考方案1】:

无状态操作(map、filter、flatMap、peek等)完全融合;我们构建了一个级联的Consumer 对象链并将数据倒入其中。每个元素都可以相互独立地操作,因此链中永远不会“卡住”任何东西。 (这就是 Louis 所说的融合是如何实现的——我们将阶段组合成一个大函数,并将数据提供给它。)

有状态的 操作(distinct、sorted、limit 等)更复杂,并且它们的行为变化更大。每个有状态的操作都可以选择它想要如何实现自己,因此它可以选择侵入性最小的方法。例如,distinct(在某些情况下)允许元素在经过审查时出现,而sorted 是一个完整的屏障。 (不同之处在于可能有多少惰性,以及它们如何处理诸如无限源之类的事情以及下游的限制操作。)

确实,有状态的操作通常会破坏一些融合的好处,但不是全部(上游和下游的操作仍然可以融合。)

除了您观察到的短路的价值之外,融合的其他重大优势包括 (a) 您不必在阶段之间填充中间结果容器,以及 (b) 您正在处理的数据是在缓存中总是“热”。

【讨论】:

感谢您的回答 - 非常好。我是否认为如果没有融合,Stream API 中就不可能进行短路和惰性求值? 不是非黑即白。有些情况下我们仍然不能短路,即使我们理论上可以,有些情况下我们仍然可以在没有熔合的情况下短路。但它肯定促进更有效的短路。 融合的另一个好处是,对于像 forEach 这样简单的东西,比如 10,000,000 个或更多元素,需要过滤和映射 - 流可以立即开始打印它们,而不必等待所有过滤和映射完成?此外,我认为融合有助于并行性?【参考方案2】:

是的,差不多。所有这些都可以通过查看源代码来检查。

不过,Fusion 并没有像您认为的那样实现。无需查看整个管道并决定如何融合它;没有旗帜或任何东西;只是操作是否表示为 StatefulOp 对象,它可以运行整个流直到该点并获得所有输出,或者 StatelessOp 只是装饰了一个 Sink,它说明了元素的去向。您可以查看源代码,例如以sortedmap 为例。

【讨论】:

以上是关于Java流操作融合与有状态中间操作的主要内容,如果未能解决你的问题,请参考以下文章

Flink流处理- 数据流操作

流的有状态和无状态方法

Java 持久性应用程序无状态与有状态

Java8 新特性 Steam() 中间有状态操作

Stream

高效 告别996,开启java高效编程之门 3-10实战:常用中间操作总结