这是 Files.lines() 中的错误,还是我对并行流有误解?

Posted

技术标签:

【中文标题】这是 Files.lines() 中的错误,还是我对并行流有误解?【英文标题】:Is this a bug in Files.lines(), or am I misunderstanding something about parallel streams? 【发布时间】:2015-03-31 07:55:51 【问题描述】:

环境:Ubuntu x86_64 (14.10),Oracle JDK 1.8u25

我尝试使用Files.lines() 的并行流,但我想.skip() 第一行(这是一个带有标题的CSV 文件)。因此我尝试这样做:

try (
    final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8)
        .skip(1L).parallel();
) 
    // etc

但随后一列未能解析为 int...

所以我尝试了一些简单的代码。该文件的问题很简单:

$ cat info.csv 
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
1422758875023;34;54;151;4375;4375;27486
$

而且代码同样简单:

public static void main(final String... args)

    final Path path = Paths.get("/home/fge/tmp/dd/info.csv");
    Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel()
        .forEach(System.out::println);

系统地得到以下结果(好吧,我只运行了大约 20 次):

startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes

我在这里错过了什么?


编辑 问题或误解似乎比这更根深蒂固(下面的两个示例是由 FreeNode 的 ##java 上的一个人编造的):

public static void main(final String... args)

    new BufferedReader(new StringReader("Hello\nWorld")).lines()
        .skip(1L).parallel()
        .forEach(System.out::println);

    final Iterator<String> iter
        = Arrays.asList("Hello", "World").iterator();
    final Spliterator<String> spliterator
        = Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED);
    final Stream<String> s
        = StreamSupport.stream(spliterator, true);

    s.skip(1L).forEach(System.out::println);

打印出来:

Hello
Hello

呃。

@Holger 建议对于 ORDERED 而不是 SIZED 的任何流都会发生这种情况:

Stream.of("Hello", "World")
    .filter(x -> true)
    .parallel()
    .skip(1L)
    .forEach(System.out::println);

此外,它源于已经发生的所有讨论,即问题(如果是一个?)与.forEach()(如@SotiriosDelimanolis first pointed out)有关。

【问题讨论】:

我认为问题出在forEach 没有误解的余地——我简直不敢相信 JDK 中有这么明显的错误。我用十个元素尝试了你的代码,你猜怎么着,tenth 元素被跳过了。 您可以将第二个示例简化为Stream.of("Hello", "World").filter(x-&gt;true) .parallel().skip(1).forEach(System.out::println);,因为任何没有可预测结果大小的流都可以。 Nicolai 和 Magnamag 的回答很好。我将添加那些被忽略的一些细节:Stream 实现将在可能的地方将 UNORDERED 特性反向传播到管道上。这使得像 orderedStream().sorted().forEach() 这样的计算能够优化排序。 (换句话说,想象.forEach() 真的是.unordered().forEach() 的宏)。理解并行上下文中的排序是很困难的。我们都有多年的顺序偏见,导致我们对排序做出隐含的假设。 @Brian Goetz:它可能优化掉sort,所以很遗憾它没有。即使使用.parallel().sorted().forEach(),终端消费者也会以任意顺序处理项目,但只有在执行后才会浪费资源先对项目进行排序。由于排序在顺序上下文中仍然很明显,因此在使用.sorted().forEach() 时,难怪开发人员在被告知“.forEach() 使 整个管道 无序”时感到惊讶。如果它真的一直如此,那就更好了。 【参考方案1】:

由于问题的当前状态与此处先前的陈述完全相反,因此应该注意,现在有一个 explicit statement by Brian Goetz 关于通过 skip 操作的无序特征的反向传播是被认为是一个错误。 It’s also stated 现在认为它根本没有反向传播终端操作的有序性。

还有一个related bug report, JDK-8129120,其状态为“已在Java 9 中修复”,它是backported to Java 8, update 60

我对@9​​87654326@ 进行了一些测试,似乎现在的实现确实表现出更直观的行为。

【讨论】:

感谢您的提醒。我完全忘了测试 8u60。实际上非常有趣的是,所有赞成的答案都坚持认为它不是错误,而它是:-) 这是一个错误”在这个答案中应该用大写字母写!每个人都应该为它投票! @DidierL 在重新阅读所有内容之后(我相信)我已经完全理解了所有这些,我同意你的看法。我已将接受的答案更改为这个。【参考方案2】:

此答案已过时 - 请阅读 THIS ONE INSTEAD!


快速回答问题:观察到的行为是有意的!没有错误,一切都按照文档进行。但是可以说,应该更好地记录和传达这种行为。 forEach 忽略排序的方式应该更加明显。

我将首先介绍允许观察到的行为的概念。这为剖析问题中给出的示例之一提供了背景。我将在高级别上执行此操作,然后在非常低级别上再次执行此操作。

[TL;DR:自行阅读,高级解释会给出一个粗略的答案。]

概念

我们不谈Streams,它是流相关方法操作或返回的类型,我们来谈谈流操作流管道。该方法调用linesskipparallel 是构建流管道[1] 的流操作,并且-正如其他人所指出的-当调用终端操作forEach 时,该管道被作为一个整体处理[2 ].

可以将管道视为一系列操作,这些操作一个接一个地在整个流上执行(例如过滤所有元素,将剩余元素映射到数字,对所有数字求和)。 但这是误导! 一个更好的比喻是终端操作通过每个操作提取单个元素[3](例如,获取下一个未过滤的元素,映射它,将其添加到 sum,请求下一个元素)。一些中间操作可能需要遍历几个(例如skip)甚至所有(例如sort)元素才能返回请求的下一个元素,这是操作中状态的来源之一。

每个操作都用这些StreamOpFlags 表示其特征:

DISTINCT SORTED ORDERED SIZED SHORT_CIRCUIT

它们在流源、中间操作和终端操作之间组合在一起,构成了管道(作为一个整体)的特征,然后用于优化[4]。同样,一个管道是否并行执行是整个管道的属性[5]。

因此,每当您对这些特征进行假设时,您都必须仔细查看构建管道的所有操作,无论它们的应用顺序如何,以及它们做出什么保证。这样做时,请记住终端操作如何通过管道拉动每个单独的元素。

示例

让我们看看这个特殊情况:

BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo\nBar"));
fooBarReader.lines()
        .skip(1L)
        .parallel()
        .forEach(System.out::println);

高级

无论您的流源是否已订购(已订购),通过调用 forEach(而不是 forEachOrdered),您声明顺序对您来说并不重要[6] ,这有效地将skip 从“跳过前 n 个元素”减少到“跳过任何 n 个元素”[7](因为没有顺序,前者变得毫无意义)。

因此,如果可以保证加速,则您授予管道忽略顺序的权利。对于并行执行,它显然是这样认为的,这就是你得到观察到的输出的原因。因此您观察到的是预期的行为,没有错误。

请注意,这 不冲突skip 是有状态的!如上所述,有状态并不意味着它以某种方式缓存整个流(减去跳过的元素)并且随后的所有内容都在这些元素上执行。这只是意味着该操作具有某种状态 - 即跳过元素的数量(嗯,它实际上不是 that easy,但由于我对正在发生的事情的理解有限,我会说这是一个公平的简化)。

低级

让我们更详细地看一下:

    BufferedReader.lines 创建Stream,我们称之为_lines: creates an ordered Spliterator 将其交给StreamSupport.stream,后者会创建一个ReferencePipeline.Head,并将拆分器标志转换为流操作标志 .skip 创建一个新的Stream,我们称之为_skip: 致电ReferencePipeline.skipSliceOps.makeRef 构造一个“切片”操作(跳过和限制的泛化) 这会创建ReferencePipeline.StatefulOp 的匿名实例,它引用_lines 作为其源 .parallel 如上所述为整个管道设置并行标志 .forEach 实际开始执行

那么让我们看看管道是如何执行的:

    调用_skip.forEach 会创建一个ForEachOp(我们称之为_forEach)并将其交给_skip.evaluate,它会做两件事:
      调用sourceSpliterator 为这个管道阶段围绕源创建一个拆分器: 自己调用opEvaluateParallelLazy(事实证明) 这确定流是无序的,creates an UnorderedSliceSpliterator(我们称之为_sliceSpliterator)和skip = 1,没有限制。 调用_forEach.evaluateParallel,它创建了一个ForEachTask(因为它是无序的;我们称之为_forEachTask)并调用它
    _forEachTask.compute 中,任务拆分前 1024 行,为它创建一个新任务(我们称之为 _forEachTask2),意识到没有剩余行并完成。 在分叉连接池中,_forEachTask2.compute 被调用,但徒劳地尝试再次拆分并通过调用 _skip.copyIntofinally starts copying its elements into the sink(System.out.println 周围的流感知包装器)。 这实际上将任务委托给指定的拆分器。 这是上面创建的_sliceSpliterator 所以_sliceSpliterator.forEachRemaining 负责将未跳过的元素交给 println-sink:
      它将一大块(在本例中为所有)行放入缓冲区并计算它们的数量 它尝试通过acquirePermits 请求尽可能多的许可(我假设是由于并行化) 在源中有两个元素和一个要跳过的元素,它只获得一个许可证(通常假设 n) 它让缓冲区将前 n 个元素(因此在这种情况下只有第一个)放入接收器

所以UnorderedSliceSpliterator.OfRef.forEachRemaining 是最终真正忽略订单的地方。我没有将其与有序变体进行比较,但这是我的假设,为什么要这样做:

在并行化下,将拆分器的元素铲入缓冲区可能会与执行相同操作的其他任务交错 这将使跟踪他们的订单变得非常困难 这样做或防止交错会降低性能,如果顺序无关紧要则毫无意义 如果订单丢失,除了处理前 n 个允许的元素外,别无他法

有什么问题吗? ;) 抱歉这么久。也许我应该省略细节并写一篇博客文章....

来源

[1]java.util.stream - Stream operations and pipelines:

流操作分为中间终端操作,组合成流管道

[2]java.util.stream - Stream operations and pipelines:

直到管道的终端操作执行完毕,管道源的遍历才开始。

[3] 这个比喻代表了我对流的理解。除了代码之外,主要来源是来自java.util.stream - Stream operations and pipelines 的引用(突出显示我的):

延迟处理流可以显着提高效率;在诸如上面的 filter-map-sum 示例的管道中,过滤、映射和求和可以融合到数据的单次传递中,具有最小的中间状态。懒惰还允许在没有必要时避免检查所有数据。对于诸如“查找第一个超过 1000 个字符的字符串”之类的操作,只需检查足够多的字符串即可找到具有所需特征的字符串,而无需检查源中可用的所有字符串。

[4]java.util.stream.StreamOpFlag:

在管道的每个阶段,可以计算组合的流和操作标志 [... 关于如何在源操作、中间操作和终端操作中组合标志 ... ] 以产生从管道输出的标志。然后可以使用这些标志来应用优化。

在代码中,您可以在 AbstractPipeline.combinedFlags 中看到这一点,它是在构造过程中(以及在其他一些情况下)通过结合前一个操作和新操作的标志来设置的。

[5] java.util.stream - Parallelism(我无法直接链接到 - 向下滚动一点):

当启动终端操作时,流管道会根据调用它的流的方向顺序或并行执行。

在代码中,您可以看到这是在 AbstractPipeline.sequential, parallelisParallel 中,它们设置/检查流源上的布尔标志,使其在构造流时调用设置器时无关紧要。

[6]java.util.stream.Stream.forEach:

为此流的每个元素执行一个操作。 [...] 此操作的行为是明确的不确定性。

对比java.util.stream.Stream.forEachOrdered:

如果流具有定义的遇到顺序,则按照流的遇到顺序对该流的每个元素执行一个操作。

[7] 这也没有明确记录,但我对Stream.skip 的评论的解释(我大大缩短了):

[...] skip() [...] 在有序并行管道上可能会非常昂贵 [...] 因为 skip(n) 被限制为不仅跳过任何 n 个元素,而且跳过其中的前 n 个元素相遇顺序。 [...] [R]移除排序约束 [...] 可能会导致并行管道中的 skip() 显着加速

【讨论】:

哇!这是一些很棒的挖掘......总而言之,你正在研究为什么forEach()基本上忽略了javadoc所说的“中间状态操作”(比如.skip());现在,不确定这是否是有意的行为,但无论如何,+1 用于研究和(源代码!)链接撒 不,我不认为forEach 忽略了它不应该忽略的任何内容。我重写了一些部分以使其更清晰。 (对不起,文字墙;我希望结构使它有点容易理解)。 最后添加了forEach内部发生的所有细节。现在您可以看到订单最终搞砸的那一行。 Stateful Skip. 在我看来,skip() 是有状态的被解释为它的意思是这样的:操作看到有序流源,跳过第一个 @ 987654423@ 元素,并将其余部分提供给后面的任何内容。 事实并非如此。 这只是意味着 skip 具有 some 状态 - 在这种情况下,它或多或少只是跳过元素的数量,仅此而已。它仍然只是流中的一些操作,并且与任何其他操作一样被 ORDERED 标志怀疑。 跳过并订购。 skip() 考虑遇到订单如果你创建了一个有序的管道但是 使用forEach 创建一个无序的。 这会影响整个管道,包括skip(),然后允许忽略顺序。无论管道是否并行处理都是如此(这是一个实现细节,即顺序执行和并行执行之间的行为不同)。这种行为是有意的,文档也这么说 - 只是不是很清楚。【参考方案3】:

问题是您将并行流与 forEach 一起使用,并且您期望跳过操作依赖于正确的元素顺序,而这里不是这种情况。摘自forEach 文档:

对于并行流管道,此操作不保证 尊重流的相遇顺序,因为这样做会牺牲 并行的好处。

我想基本上会发生什么是跳过操作首先在第二行执行,而不是在第一行。如果您使流顺序化或使用forEachOrdered,您可以看到它会产生预期的结果。另一种方法是使用Collectors。

【讨论】:

我只是在写 :-) 删除并行确实可以解决问题(并且并行化 I/O 操作可能不会对性能产生太大影响)。 @assylias:删除parallel 可能会解决问题,但只能使用forEachOrdered 保证 来做预期的事情。 嗯,我会试试,但另一方面,Files.lines() 生成的 Stream 几乎是按定义排序的,不是吗? forEach忽略这个顺序没关系,因为skipshouldn't,这是一个有状态的操作。正如@fge 所指出的,Files.lines() 产生一个有序流。将其设为.skip(1).limit(...),您将不再获得第一行。根据你的解释你还是会的。 @Holger 保证只获得第二行是绝对存在的。检查我对相关规范报价的回答。【参考方案4】:

让我引用一些相关的东西——skip 的 Javadoc:

虽然 skip() 在顺序流管道上通常是一种便宜的操作,但在有序并行管道上可能会非常昂贵,尤其是对于较大的 n 值,因为 skip(n) 不仅限于跳过任何 n 个元素,而且相遇顺序中的前 n 个元素。

现在,很确定Files.lines() 具有一个明确定义的遭遇顺序并且是一个ORDERED 流(如果不是,即使在顺序操作中也不能保证遇到顺序匹配文件顺序),因此可以保证生成的流将确定性地仅包含示例中的第二行。

不管有没有其他的东西,保证肯定是有的。

【讨论】:

问题是调用forEach(而不是forEachOrdered)的流是否形成“有序并行管道”(注意措辞“管道”而不是“流”) .但我同意,如果它真的打算定义这种违反直觉的行为,这还不是一个充分的规范。顺便说一句,它似乎在两个方向都有效,例如调用.unordered().forEachOrdered(…) 似乎强加了一种有序行为。 从语义方面来说,forEach 只能允许流中定义好的内容的任意遇到顺序。不允许将有序流转换为无序流。如果实现不尊重这一点,那么我会认为它已损坏。 我不确定。毕竟,有forEachOrdered,它维护了顺序,而像“为跳过但不为未跳过的元素保持顺序”之类的操作没有意义,即使对于并行性能也是如此。我想,如果将方法命名为 forEachforEachUnordered 而不是 forEachOrderedforEach 会好得多,因为这两种方法中的后一种是您在使用前应该三思而后行的方法。但我真的很期待看到 API 设计者的最后一句话,一旦他们出现…… 请注意,您对forEach 的期望(“forEach 应该是不会改变流定义的任何内容(您可能会或可能不会得到排序)”正是为forEachOrdered 指定:“如果流具有定义的遇到顺序,则按照流的遇到顺序执行此流的每个元素的操作。”恐怕,我们能得到的最好的结果是“你'是的,应该反过来,但现在我们必须保持兼容性' 然而,用户可见的效果是forEach 对流做了一些事情(改变它的语义),而forEachOrdered“让它不管”。基本上就是你说的。因此,我们在 FJ 内部(forEach 是“侵入性较小”的操作)和用户的角度(其中 forEachOrdered 侵入性较小)之间存在冲突。显然,API 应该遵循用户的观点。【参考方案5】:

我知道如何解决这个问题,这在之前的讨论中是看不到的。您可以重新创建将管道拆分为两个管道的流,同时保持整个事情的惰性。

public static <T> Stream<T> recreate(Stream<T> stream) 
    return StreamSupport.stream(stream.spliterator(), stream.isParallel())
                        .onClose(stream::close);


public static void main(String[] args) 
    recreate(new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5")).lines()
        .skip(1).parallel()).forEach(System.out::println);

当您从初始流拆分器重新创建流时,您实际上是创建了一个新管道。在大多数情况下recreate 将作为no-op 工作,但问题是第一和第二管道不共享parallelunordered 状态。因此,即使您使用forEach(或任何其他无序终端操作),也只有第二个流变为无序。

内部非常相似的事情是将您的流与空流连接:

Stream.concat(Stream.empty(), 
    new BufferedReader(new StringReader("JUNK\n1\n2\n3\n4\n5"))
          .lines().skip(1).parallel()).forEach(System.out::println);

虽然它有更多的开销。

【讨论】:

Tagir,我已经测试了你的解决方法,它确实有效。然而,虽然我觉得它很神奇,但我不明白为什么它真的有效。我的意思是,使它起作用的机制是什么?为什么使用原始流的拆分器创建另一个管道,并具有与原始流相同的并行状态,使得第二个管道实际上跳过了第一个流的第一个 sequential 元素? @FedericoPeraltaSchaffner,因为bug 原因是当“无序”之类的流标志错误地反向传播到先前的操作时,反向传播不正确。然而,反向传播不能通过流娱乐进行。所以我只是通过这个技巧强行阻止了反向传播。另请注意 Holger 的回答:它已在 8u60 中修复。

以上是关于这是 Files.lines() 中的错误,还是我对并行流有误解?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 Files.lines(和类似的 Streams)不会自动关闭?

这是 g++ for 循环实现还是我的代码中的错误

jupyter notebook 中 python 的奇怪行为;这是一个错误还是我应该接受它?

它是 ReduceVocab() 中的错误还是遗漏了啥?

Java 8流到文件[重复]

这是一个可能的 Oracle 错误还是我遗漏了啥?