Java并行流-调用parallel()方法的顺序
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并行流-调用parallel()方法的顺序相关的知识,希望对你有一定的参考价值。
AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
.map(record -> new Record(recordNumber.incrementAndGet(), record))
.parallel()
.filter(record -> doSomeOperation())
.findFirst()
[我写这篇文章时,我假设线程将仅在map调用中产生,因为在map之后放置了parallel。但是文件中的某些行每次执行都会获得不同的记录号。
我阅读了官方的Java stream documentation和一些网站,以了解流在幕后的工作方式。
几个问题:
- Java并行流基于SplitIterator工作,它由ArrayList,LinkedList等每个集合实现。当我们从这些集合构造并行流时,将使用相应的拆分迭代器来拆分和迭代该集合。这解释了为什么并行性发生在原始输入源(文件行)级别而不是映射结果(即Record pojo)。我的理解正确吗?
- 就我而言,输入是文件IO流。将使用哪个吐痰迭代器?
我们在管道中放置
parallel()
的位置都没关系。原始输入源将始终被分割,其余中间操作将被应用。在这种情况下,Java不应允许用户在管道中的任何地方(除了原始来源之外)进行并行操作。因为,它为那些不知道java流内部工作方式的人提供了错误的理解。我知道
parallel()
操作将已经为Stream对象类型定义,因此,它是以这种方式工作的。但是,最好提供一些替代解决方案。- 在上面的代码片段中,我试图为输入文件中的每个记录添加一个行号,因此应该对其进行排序。但是,我想并行应用
doSomeOperation()
,因为它很重。一种实现方式是编写自己的自定义拆分迭代器。还有其他办法吗?
这说明了为什么并行性发生在原始输入源(文件行)级别而不是映射结果(即Record pojo)。
整个流是并行的或顺序的。我们没有选择要顺序或并行运行的操作子集。
当启动终端操作时,根据调用流的方向,顺序或并行执行流管道。当终端操作启动时,流管线根据被调用的流的模式顺序或并行执行。 source
正如您提到的,并行流使用拆分迭代器。显然,这是在操作开始运行之前对数据进行分区。
就我而言,输入是文件IO流。将使用哪个吐痰迭代器?
查看源代码,我看到它使用了java.nio.file.FileChannelLinesSpliterator
我们在管道中的哪里放置parallel()都没有关系。原始输入源将始终被分割,其余中间操作将被应用。
对。您甚至可以多次调用parallel()
和sequential()
。最后调用的那个将获胜。调用parallel()
时,将其设置为返回的流。并且如上所述,所有操作依次或并行运行。
在这种情况下,Java不应允许用户在管道中的任何位置(除了原始源之外都进行并行操作...
这成为意见问题。我认为Zabuza有充分的理由支持JDK设计师的选择。
一种实现方法是编写自己的自定义拆分迭代器。还有其他办法吗?
这取决于您的操作
- 如果
findFirst()
是您真正的终端操作,那么您甚至不必担心并行执行,因为无论如何,doSomething()
的调用不会太多(findFirst()
处于短路状态)。实际上,.parallel()
可能会导致处理多个元素,而顺序流上的findFirst()
可能会阻止该元素。 如果终端操作不会创建太多数据,则可以使用顺序流创建
Record
对象,然后并行处理结果:List<Record> smallData = Files.lines(inputFile.toPath(), StandardCharsets.UTF_8) .map(record -> new Record(recordNumber.incrementAndGet(), record)) .collect(Collectors.toList()) .parallelStream() .filter(record -> doSomeOperation()) .collect(Collectors.toList());
如果您的管道会在内存中加载大量数据(这可能是您使用
Files.lines()
的原因),那么可能需要自定义拆分迭代器。不过,在我去那里之前,我会研究其他选项(例如,以id列开头的行保存-这只是我的看法)。我也将尝试以较小的批次处理记录,例如:AtomicInteger recordNumber = new AtomicInteger(); final int batchSize = 10; try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(), StandardCharsets.UTF_8)).toPath(), StandardCharsets.UTF_8);) Supplier<List<Record>> batchSupplier = () -> List<Record> batch = new ArrayList<>(); for (int i = 0; i < batchSize; i++) String nextLine; try nextLine = reader.readLine(); catch (IOException e) //hanlde exception throw new RuntimeException(e); if(null == nextLine) return batch; batch.add(new Record(recordNumber.getAndIncrement(), nextLine)); System.out.println("next batch"); return batch; ; Stream.generate(batchSupplier) .takeWhile(list -> list.size() >= batchSize) .map(list -> list.parallelStream() .filter(record -> doSomeOperation()) .collect(Collectors.toList())) .flatMap(List::stream) .forEach(System.out::println);
这将并行执行
doSomeOperation()
,而不将所有数据加载到内存中。但是请注意,需要考虑batchSize
。
以上是关于Java并行流-调用parallel()方法的顺序的主要内容,如果未能解决你的问题,请参考以下文章