这是 Files.lines() 中的错误,还是我对并行流有误解?
Posted
技术标签:
【中文标题】这是 Files.lines() 中的错误,还是我对并行流有误解?【英文标题】:Is this a bug in Files.lines(), or am I misunderstanding something about parallel streams? 【发布时间】:2015-03-31 07:55:51 【问题描述】:环境:Ubuntu x86_64 (14.10),Oracle JDK 1.8u25
我尝试使用Files.lines()
的并行流,但我想.skip()
第一行(这是一个带有标题的CSV 文件)。因此我尝试这样做:
try (
final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8)
.skip(1L).parallel();
)
// etc
但随后一列未能解析为 int...
所以我尝试了一些简单的代码。该文件的问题很简单:
$ cat info.csv
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
1422758875023;34;54;151;4375;4375;27486
$
而且代码同样简单:
public static void main(final String... args)
final Path path = Paths.get("/home/fge/tmp/dd/info.csv");
Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel()
.forEach(System.out::println);
我系统地得到以下结果(好吧,我只运行了大约 20 次):
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
我在这里错过了什么?
编辑 问题或误解似乎比这更根深蒂固(下面的两个示例是由 FreeNode 的 ##java 上的一个人编造的):
public static void main(final String... args)
new BufferedReader(new StringReader("Hello\nWorld")).lines()
.skip(1L).parallel()
.forEach(System.out::println);
final Iterator<String> iter
= Arrays.asList("Hello", "World").iterator();
final Spliterator<String> spliterator
= Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED);
final Stream<String> s
= StreamSupport.stream(spliterator, true);
s.skip(1L).forEach(System.out::println);
打印出来:
Hello
Hello
呃。
@Holger 建议对于 ORDERED
而不是 SIZED
的任何流都会发生这种情况:
Stream.of("Hello", "World")
.filter(x -> true)
.parallel()
.skip(1L)
.forEach(System.out::println);
此外,它源于已经发生的所有讨论,即问题(如果是一个?)与.forEach()
(如@SotiriosDelimanolis first pointed out)有关。
【问题讨论】:
我认为问题出在forEach
。
没有误解的余地——我简直不敢相信 JDK 中有这么明显的错误。我用十个元素尝试了你的代码,你猜怎么着,tenth 元素被跳过了。
您可以将第二个示例简化为Stream.of("Hello", "World").filter(x->true) .parallel().skip(1).forEach(System.out::println);
,因为任何没有可预测结果大小的流都可以。
Nicolai 和 Magnamag 的回答很好。我将添加那些被忽略的一些细节:Stream 实现将在可能的地方将 UNORDERED 特性反向传播到管道上。这使得像 orderedStream().sorted().forEach() 这样的计算能够优化排序。 (换句话说,想象.forEach()
真的是.unordered().forEach()
的宏)。理解并行上下文中的排序是很困难的。我们都有多年的顺序偏见,导致我们对排序做出隐含的假设。
@Brian Goetz:它可能优化掉sort
,所以很遗憾它没有。即使使用.parallel().sorted().forEach()
,终端消费者也会以任意顺序处理项目,但只有在执行后才会浪费资源先对项目进行排序。由于排序在顺序上下文中仍然很明显,因此在使用.sorted().forEach()
时,难怪开发人员在被告知“.forEach()
使 整个管道 无序”时感到惊讶。如果它真的一直如此,那就更好了。
【参考方案1】:
由于问题的当前状态与此处先前的陈述完全相反,因此应该注意,现在有一个 explicit statement by Brian Goetz 关于通过 skip
操作的无序特征的反向传播是被认为是一个错误。 It’s also stated 现在认为它根本没有反向传播终端操作的有序性。
还有一个related bug report, JDK-8129120,其状态为“已在Java 9 中修复”,它是backported to Java 8, update 60
我对@987654326@ 进行了一些测试,似乎现在的实现确实表现出更直观的行为。
【讨论】:
感谢您的提醒。我完全忘了测试 8u60。实际上非常有趣的是,所有赞成的答案都坚持认为它不是错误,而它是:-) “这是一个错误”在这个答案中应该用大写字母写!每个人都应该为它投票! @DidierL 在重新阅读所有内容之后(我相信)我已经完全理解了所有这些,我同意你的看法。我已将接受的答案更改为这个。【参考方案2】:此答案已过时 - 请阅读 THIS ONE INSTEAD!
快速回答问题:观察到的行为是有意的!没有错误,一切都按照文档进行。但是可以说,应该更好地记录和传达这种行为。 forEach
忽略排序的方式应该更加明显。
我将首先介绍允许观察到的行为的概念。这为剖析问题中给出的示例之一提供了背景。我将在高级别上执行此操作,然后在非常低级别上再次执行此操作。
[TL;DR:自行阅读,高级解释会给出一个粗略的答案。]
概念
我们不谈Stream
s,它是流相关方法操作或返回的类型,我们来谈谈流操作和流管道。该方法调用lines
、skip
和parallel
是构建流管道[1] 的流操作,并且-正如其他人所指出的-当调用终端操作forEach
时,该管道被作为一个整体处理[2 ].
可以将管道视为一系列操作,这些操作一个接一个地在整个流上执行(例如过滤所有元素,将剩余元素映射到数字,对所有数字求和)。 但这是误导! 一个更好的比喻是终端操作通过每个操作提取单个元素[3](例如,获取下一个未过滤的元素,映射它,将其添加到 sum,请求下一个元素)。一些中间操作可能需要遍历几个(例如skip
)甚至所有(例如sort
)元素才能返回请求的下一个元素,这是操作中状态的来源之一。
每个操作都用这些StreamOpFlag
s 表示其特征:
DISTINCT
SORTED
ORDERED
SIZED
SHORT_CIRCUIT
它们在流源、中间操作和终端操作之间组合在一起,构成了管道(作为一个整体)的特征,然后用于优化[4]。同样,一个管道是否并行执行是整个管道的属性[5]。
因此,每当您对这些特征进行假设时,您都必须仔细查看构建管道的所有操作,无论它们的应用顺序如何,以及它们做出什么保证。这样做时,请记住终端操作如何通过管道拉动每个单独的元素。
示例
让我们看看这个特殊情况:
BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo\nBar"));
fooBarReader.lines()
.skip(1L)
.parallel()
.forEach(System.out::println);
高级
无论您的流源是否已订购(已订购),通过调用 forEach
(而不是 forEachOrdered
),您声明顺序对您来说并不重要[6] ,这有效地将skip
从“跳过前 n 个元素”减少到“跳过任何 n 个元素”[7](因为没有顺序,前者变得毫无意义)。
因此,如果可以保证加速,则您授予管道忽略顺序的权利。对于并行执行,它显然是这样认为的,这就是你得到观察到的输出的原因。因此您观察到的是预期的行为,没有错误。
请注意,这 不冲突 与 skip
是有状态的!如上所述,有状态并不意味着它以某种方式缓存整个流(减去跳过的元素)并且随后的所有内容都在这些元素上执行。这只是意味着该操作具有某种状态 - 即跳过元素的数量(嗯,它实际上不是 that easy,但由于我对正在发生的事情的理解有限,我会说这是一个公平的简化)。
低级
让我们更详细地看一下:
BufferedReader.lines
创建Stream
,我们称之为_lines
:
creates an ordered Spliterator
将其交给StreamSupport.stream
,后者会创建一个ReferencePipeline.Head
,并将拆分器标志转换为流操作标志
.skip
创建一个新的Stream
,我们称之为_skip
:
致电ReferencePipeline.skip
用SliceOps.makeRef
构造一个“切片”操作(跳过和限制的泛化)
这会创建ReferencePipeline.StatefulOp
的匿名实例,它引用_lines
作为其源
.parallel
如上所述为整个管道设置并行标志
.forEach
实际开始执行
那么让我们看看管道是如何执行的:
-
调用
_skip.forEach
会创建一个ForEachOp
(我们称之为_forEach
)并将其交给_skip.evaluate
,它会做两件事:
-
调用
sourceSpliterator
为这个管道阶段围绕源创建一个拆分器:
自己调用opEvaluateParallelLazy
(事实证明)
这确定流是无序的,creates an UnorderedSliceSpliterator
(我们称之为_sliceSpliterator
)和skip = 1
,没有限制。
调用_forEach.evaluateParallel
,它创建了一个ForEachTask
(因为它是无序的;我们称之为_forEachTask
)并调用它
_forEachTask.compute
中,任务拆分前 1024 行,为它创建一个新任务(我们称之为 _forEachTask2
),意识到没有剩余行并完成。
在分叉连接池中,_forEachTask2.compute
被调用,但徒劳地尝试再次拆分并通过调用 _skip.copyInto
finally starts copying its elements into the sink(System.out.println
周围的流感知包装器)。
这实际上将任务委托给指定的拆分器。 这是上面创建的_sliceSpliterator
! 所以_sliceSpliterator.forEachRemaining
负责将未跳过的元素交给 println-sink:
-
它将一大块(在本例中为所有)行放入缓冲区并计算它们的数量
它尝试通过
acquirePermits
请求尽可能多的许可(我假设是由于并行化)
在源中有两个元素和一个要跳过的元素,它只获得一个许可证(通常假设 n)
它让缓冲区将前 n 个元素(因此在这种情况下只有第一个)放入接收器
所以UnorderedSliceSpliterator.OfRef.forEachRemaining
是最终真正忽略订单的地方。我没有将其与有序变体进行比较,但这是我的假设,为什么要这样做:
有什么问题吗? ;) 抱歉这么久。也许我应该省略细节并写一篇博客文章....
来源
[1]java.util.stream
- Stream operations and pipelines:
流操作分为中间和终端操作,组合成流管道。
[2]java.util.stream
- Stream operations and pipelines:
直到管道的终端操作执行完毕,管道源的遍历才开始。
[3] 这个比喻代表了我对流的理解。除了代码之外,主要来源是来自java.util.stream
- Stream operations and pipelines 的引用(突出显示我的):
延迟处理流可以显着提高效率;在诸如上面的 filter-map-sum 示例的管道中,过滤、映射和求和可以融合到数据的单次传递中,具有最小的中间状态。懒惰还允许在没有必要时避免检查所有数据。对于诸如“查找第一个超过 1000 个字符的字符串”之类的操作,只需检查足够多的字符串即可找到具有所需特征的字符串,而无需检查源中可用的所有字符串。
[4]java.util.stream.StreamOpFlag
:
在管道的每个阶段,可以计算组合的流和操作标志 [... 关于如何在源操作、中间操作和终端操作中组合标志 ... ] 以产生从管道输出的标志。然后可以使用这些标志来应用优化。
在代码中,您可以在 AbstractPipeline.combinedFlags
中看到这一点,它是在构造过程中(以及在其他一些情况下)通过结合前一个操作和新操作的标志来设置的。
[5] java.util.stream
- Parallelism(我无法直接链接到 - 向下滚动一点):
当启动终端操作时,流管道会根据调用它的流的方向顺序或并行执行。
在代码中,您可以看到这是在 AbstractPipeline.sequential
, parallel
和 isParallel
中,它们设置/检查流源上的布尔标志,使其在构造流时调用设置器时无关紧要。
[6]java.util.stream.Stream.forEach:
为此流的每个元素执行一个操作。 [...] 此操作的行为是明确的不确定性。
对比java.util.stream.Stream.forEachOrdered:
如果流具有定义的遇到顺序,则按照流的遇到顺序对该流的每个元素执行一个操作。
[7] 这也没有明确记录,但我对Stream.skip
的评论的解释(我大大缩短了):
[...] skip() [...] 在有序并行管道上可能会非常昂贵 [...] 因为 skip(n) 被限制为不仅跳过任何 n 个元素,而且跳过其中的前 n 个元素相遇顺序。 [...] [R]移除排序约束 [...] 可能会导致并行管道中的 skip() 显着加速
【讨论】:
哇!这是一些很棒的挖掘......总而言之,你正在研究为什么forEach()
基本上忽略了javadoc所说的“中间状态操作”(比如.skip()
);现在,不确定这是否是有意的行为,但无论如何,+1 用于研究和(源代码!)链接撒
不,我不认为forEach
忽略了它不应该忽略的任何内容。我重写了一些部分以使其更清晰。 (对不起,文字墙;我希望结构使它有点容易理解)。
最后添加了forEach
内部发生的所有细节。现在您可以看到订单最终搞砸的那一行。
Stateful Skip. 在我看来,skip()
是有状态的被解释为它的意思是这样的:操作看到有序流源,跳过第一个 @ 987654423@ 元素,并将其余部分提供给后面的任何内容。 事实并非如此。 这只是意味着 skip 具有 some 状态 - 在这种情况下,它或多或少只是跳过元素的数量,仅此而已。它仍然只是流中的一些操作,并且与任何其他操作一样被 ORDERED
标志怀疑。
跳过并订购。 skip()
将考虑遇到订单如果你创建了一个有序的管道但是 使用forEach
创建一个无序的。 这会影响整个管道,包括skip()
,然后允许忽略顺序。无论管道是否并行处理都是如此(这是一个实现细节,即顺序执行和并行执行之间的行为不同)。这种行为是有意的,文档也这么说 - 只是不是很清楚。【参考方案3】:
问题是您将并行流与 forEach 一起使用,并且您期望跳过操作依赖于正确的元素顺序,而这里不是这种情况。摘自forEach 文档:
对于并行流管道,此操作不保证 尊重流的相遇顺序,因为这样做会牺牲 并行的好处。
我想基本上会发生什么是跳过操作首先在第二行执行,而不是在第一行。如果您使流顺序化或使用forEachOrdered,您可以看到它会产生预期的结果。另一种方法是使用Collectors。
【讨论】:
我只是在写 :-) 删除并行确实可以解决问题(并且并行化 I/O 操作可能不会对性能产生太大影响)。 @assylias:删除parallel
可能会解决问题,但只能使用forEachOrdered
保证 来做预期的事情。
嗯,我会试试,但另一方面,Files.lines()
生成的 Stream
几乎是按定义排序的,不是吗?
forEach
忽略这个顺序没关系,因为skip
shouldn't,这是一个有状态的操作。正如@fge 所指出的,Files.lines()
产生一个有序流。将其设为.skip(1).limit(...)
,您将不再获得第一行。根据你的解释你还是会的。
@Holger 保证只获得第二行是绝对存在的。检查我对相关规范报价的回答。【参考方案4】:
让我引用一些相关的东西——skip
的 Javadoc:
虽然 skip() 在顺序流管道上通常是一种便宜的操作,但在有序并行管道上可能会非常昂贵,尤其是对于较大的 n 值,因为 skip(n) 不仅限于跳过任何 n 个元素,而且相遇顺序中的前 n 个元素。
现在,很确定Files.lines()
具有一个明确定义的遭遇顺序并且是一个ORDERED
流(如果不是,即使在顺序操作中也不能保证遇到顺序匹配文件顺序),因此可以保证生成的流将确定性地仅包含示例中的第二行。
不管有没有其他的东西,保证肯定是有的。
【讨论】:
问题是调用forEach
(而不是forEachOrdered
)的流是否形成“有序并行管道”(注意措辞“管道”而不是“流”) .但我同意,如果它真的打算定义这种违反直觉的行为,这还不是一个充分的规范。顺便说一句,它似乎在两个方向都有效,例如调用.unordered().forEachOrdered(…)
似乎强加了一种有序行为。
从语义方面来说,forEach
只能允许流中定义好的内容的任意遇到顺序。不允许将有序流转换为无序流。如果实现不尊重这一点,那么我会认为它已损坏。
我不确定。毕竟,有forEachOrdered
,它维护了顺序,而像“为跳过但不为未跳过的元素保持顺序”之类的操作没有意义,即使对于并行性能也是如此。我想,如果将方法命名为 forEach
和 forEachUnordered
而不是 forEachOrdered
和 forEach
会好得多,因为这两种方法中的后一种是您在使用前应该三思而后行的方法。但我真的很期待看到 API 设计者的最后一句话,一旦他们出现……
请注意,您对forEach
的期望(“forEach
应该是不会改变流定义的任何内容(您可能会或可能不会得到排序)”正是为forEachOrdered
指定:“如果流具有定义的遇到顺序,则按照流的遇到顺序执行此流的每个元素的操作。”恐怕,我们能得到的最好的结果是“你'是的,应该反过来,但现在我们必须保持兼容性'
然而,用户可见的效果是forEach
对流做了一些事情(改变它的语义),而forEachOrdered
“让它不管”。基本上就是你说的。因此,我们在 FJ 内部(forEach
是“侵入性较小”的操作)和用户的角度(其中 forEachOrdered
侵入性较小)之间存在冲突。显然,API 应该遵循用户的观点。【参考方案5】:
我知道如何解决这个问题,这在之前的讨论中是看不到的。您可以重新创建将管道拆分为两个管道的流,同时保持整个事情的惰性。
public static <T> Stream<T> recreate(Stream<T> stream)
return StreamSupport.stream(stream.spliterator(), stream.isParallel())
.onClose(stream::close);
public static void main(String[] args)
recreate(new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5")).lines()
.skip(1).parallel()).forEach(System.out::println);
当您从初始流拆分器重新创建流时,您实际上是创建了一个新管道。在大多数情况下recreate
将作为no-op
工作,但问题是第一和第二管道不共享parallel
和unordered
状态。因此,即使您使用forEach
(或任何其他无序终端操作),也只有第二个流变为无序。
内部非常相似的事情是将您的流与空流连接:
Stream.concat(Stream.empty(),
new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5"))
.lines().skip(1).parallel()).forEach(System.out::println);
虽然它有更多的开销。
【讨论】:
Tagir,我已经测试了你的解决方法,它确实有效。然而,虽然我觉得它很神奇,但我不明白为什么它真的有效。我的意思是,使它起作用的机制是什么?为什么使用原始流的拆分器创建另一个管道,并具有与原始流相同的并行状态,使得第二个管道实际上跳过了第一个流的第一个 sequential 元素? @FedericoPeraltaSchaffner,因为bug 原因是当“无序”之类的流标志错误地反向传播到先前的操作时,反向传播不正确。然而,反向传播不能通过流娱乐进行。所以我只是通过这个技巧强行阻止了反向传播。另请注意 Holger 的回答:它已在 8u60 中修复。以上是关于这是 Files.lines() 中的错误,还是我对并行流有误解?的主要内容,如果未能解决你的问题,请参考以下文章
为啥 Files.lines(和类似的 Streams)不会自动关闭?