我应该尽可能使用并行流吗?
Posted
技术标签:
【中文标题】我应该尽可能使用并行流吗?【英文标题】:Should I always use a parallel stream when possible? 【发布时间】:2013-12-20 22:16:45 【问题描述】:使用 Java 8 和 lambdas,可以很容易地将集合作为流进行迭代,并且使用并行流同样容易。来自the docs的两个例子,第二个使用parallelStream:
myShapesCollection.stream()
.filter(e -> e.getColor() == Color.RED)
.forEach(e -> System.out.println(e.getName()));
myShapesCollection.parallelStream() // <-- This one uses parallel
.filter(e -> e.getColor() == Color.RED)
.forEach(e -> System.out.println(e.getName()));
只要我不关心顺序,使用并行总是有益的吗?人们会认为在更多内核上划分工作会更快。
还有其他注意事项吗?什么时候应该使用并行流,什么时候应该使用非并行?
(问这个问题是为了引发关于如何以及何时使用并行流的讨论,并不是因为我认为总是使用它们是一个好主意。)
【问题讨论】:
【参考方案1】:与顺序流相比,并行流的开销要高得多。协调线程需要大量时间。我会默认使用顺序流,并且只考虑并行流
我有大量的项目要处理(或者每个项目的处理都需要时间并且是可并行的)
我首先遇到了性能问题
我还没有在多线程环境中运行进程(例如:在 Web 容器中,如果我已经有许多请求要并行处理,那么在每个请求中添加额外的并行层可以负面影响大于正面影响)
在您的示例中,性能无论如何都会受到对System.out.println()
的同步访问的驱动,使这个过程并行不会产生任何影响,甚至会产生负面影响。
此外,请记住,并行流并不能神奇地解决所有同步问题。如果进程中使用的谓词和函数使用共享资源,则必须确保一切都是线程安全的。特别是,如果并行运行,副作用是您真正需要担心的事情。
无论如何,衡量,不要猜测!只有测量才能告诉您并行性是否值得。
【讨论】:
好答案。我要补充一点,如果您要处理大量项目,那只会增加线程协调问题;只有当每个项目的处理都需要时间并且可以并行化时,并行化才可能有用。 @WarrenDew 我不同意。 Fork/Join 系统将简单地将 N 个项目拆分为例如 4 个部分,并依次处理这 4 个部分。然后将减少 4 个结果。如果海量真的是海量,即使对于快速的单元处理,并行化也是有效的。但与往常一样,您必须衡量。 我有一组实现Runnable
的对象,我称之为start()
将它们用作Threads
,是否可以将其更改为在.forEach()
并行化中使用java 8 流?然后我就可以将线程代码从课程中剥离出来。但是有什么缺点吗?
@JBNizet 如果 4 个部分按顺序处理,那么它是并行处理还是按顺序处理没有区别吗?请澄清
@Harshana 他显然的意思是这4个部分中的每一个的元素将被顺序处理。然而,零件本身可以同时加工。换句话说,如果您有多个可用的 CPU 内核,每个部分都可以独立于其他部分在自己的内核上运行,同时按顺序处理自己的元素。 (注意:我不知道,如果这就是并行 Java 流的工作方式,我只是想澄清一下 JBNizet 的含义。)【参考方案2】:
Stream API 旨在简化以一种从执行方式中抽象出来的方式编写计算,从而轻松地在顺序和并行之间切换。
然而,仅仅因为它很简单,并不意味着它总是一个好主意,事实上,仅仅因为你可以.
首先,请注意,除了可以在更多内核可用时更快地执行之外,并行性并没有带来任何好处。并行执行总是比顺序执行涉及更多的工作,因为除了解决问题之外,它还必须执行子任务的调度和协调。希望通过在多个处理器上拆分工作,您能够更快地得到答案;这是否真的发生取决于很多事情,包括数据集的大小、对每个元素进行多少计算、计算的性质(具体来说,一个元素的处理是否与其他元素的处理相互作用?) 、可用处理器的数量以及竞争这些处理器的其他任务的数量。
此外,请注意并行性还经常暴露计算中的不确定性,而这种不确定性通常被顺序实现所隐藏;有时这并不重要,或者可以通过限制所涉及的操作来缓解(即归约运算符必须是无状态的和关联的。)
实际上,有时并行性会加快计算速度,有时不会,有时甚至会减慢计算速度。最好先使用顺序执行进行开发,然后在哪里应用并行性
(A)您知道提高性能确实有好处,并且
(B),它实际上会提供更高的性能。
(A) 是业务问题,而不是技术问题。如果您是性能专家,通常可以查看代码并确定 (B),但明智的做法是衡量。 (而且,在您确信 (A) 之前,甚至不要打扰;如果代码足够快,最好将您的大脑周期应用到其他地方。)
最简单的并行性能模型是“NQ”模型,其中N
是元素的数量,Q
是每个元素的计算量。通常,您需要产品 NQ 超过某个阈值才能开始获得性能优势。对于像“将数字从1
加到N
”这样的低Q 问题,您通常会看到N=1000
和N=10000
之间的盈亏平衡。对于 Q 值较高的问题,您会看到较低阈值的盈亏平衡点。
但实际情况相当复杂。因此,在您达到专业水平之前,请首先确定顺序处理何时会真正让您付出代价,然后衡量并行性是否会有所帮助。
【讨论】:
这篇文章提供了有关 NQ 模型的更多详细信息:gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html @specializt:将流从顺序切换到并行确实会更改算法(在大多数情况下)。这里提到的确定性是关于您的(任意)运算符可能依赖的属性(Stream 实现无法知道这一点),但当然不应该依赖。这就是这个答案的那部分试图说的。如果您关心规则,您可以获得确定性结果,就像您说的那样,(否则并行流非常无用),但也有可能故意允许不确定性,例如使用findAny
而不是 @987654330 @…
“首先,请注意,除了可以在更多内核可用时更快执行之外,并行性没有任何好处”——或者如果您正在应用涉及 IO 的操作(例如myListOfURLs.stream().map((url) -> downloadPage(url))...
)。
@Pacerier 这是一个很好的理论,但很幼稚(请参阅尝试构建自动并行编译器的 30 年历史)。由于当我们不可避免地弄错时,要在足够多的时间猜对而不惹恼用户是不切实际的,负责任的事情就是让用户说出他们想要的东西。在大多数情况下,默认(顺序)是正确的,并且更可预测。
@Jules:永远不要对 IO 使用并行流。它们仅适用于 CPU 密集型操作。并行流使用ForkJoinPool.commonPool()
,你不想阻塞任务去那里。【参考方案3】:
我观看了 Brian Goetz 的 presentations 之一(Java 语言架构师和 Lambda 表达式规范负责人)。他详细解释了在进行并行化之前需要考虑的以下 4 点:
拆分/分解成本 – 有时拆分比只做工作更昂贵!任务调度/管理成本 – 可以在将工作交给另一个线程所需的时间内完成大量工作。结果组合成本 – 有时组合涉及复制大量数据。例如,添加数字很便宜,而合并集合很昂贵。局部性 – 房间里的大象。这是每个人都可能错过的重要一点。您应该考虑缓存未命中,如果 CPU 由于缓存未命中而等待数据,那么您将不会通过并行化获得任何收益。这就是为什么基于数组的源在缓存下一个索引(当前索引附近)时并行化效果最好,并且 CPU 遇到缓存未命中的机会更少。
他还提到了一个相对简单的公式来确定并行加速的机会。
NQ 模型:
N x Q > 10000
在哪里, N = 数据项数 Q = 每个项目的工作量
【讨论】:
“每个项目的工作量”以什么单位衡量? 10000 代表什么?【参考方案4】:JB 一针见血。我唯一可以补充的是 Java 8 不做纯并行处理,它做paraquential。是的,我写了这篇文章,而且我从事 F/J 已有 30 年了,所以我确实理解这个问题。
【讨论】:
流不可迭代,因为流进行内部迭代而不是外部迭代。无论如何,这就是流的全部原因。如果您对学术工作有疑问,那么函数式编程可能不适合您。函数式编程 === 数学 === 学术。不,J8-FJ 没有坏,只是大多数人没有阅读 f****** 手册。 java 文档说得很清楚,它不是一个并行执行框架。这就是所有拆分器的全部原因。是的,它是学术性的,是的,如果您知道如何使用它,它就会起作用。是的,使用自定义执行器应该更容易 Stream 确实有一个 iterator() 方法,因此您可以根据需要在外部迭代它们。我的理解是,他们没有实现 Iterable,因为您只能使用该迭代器一次,没有人可以决定这是否可以。 说实话:你的整篇论文读起来就像是一篇大篇幅、精心设计的咆哮——这几乎否定了它的可信度……我建议用 much重新做一遍> 不那么激进的底色,否则没有多少人会真正费心去完整地阅读它……我只是说 关于您的文章的几个问题...首先,为什么您显然将平衡树结构等同于有向无环图?是的,平衡树是 DAG,但链表和除数组之外的几乎所有面向对象的数据结构也是如此。此外,当您说递归分解仅适用于平衡树结构并且因此与商业无关时,您如何证明该断言的合理性?在我看来(诚然没有真正深入研究这个问题)它应该同样在基于数组的数据结构上工作,例如ArrayList
/HashMap
.
这个帖子是从 2013 年开始的,从那以后发生了很多变化。这部分是给 cmets 的,不是详细的答案。【参考方案5】:
其他答案已经涵盖了分析,以避免在并行处理中过早优化和开销成本。这个答案解释了并行流数据结构的理想选择。
通常,并行性的性能提升在
ArrayList
、HashMap
、HashSet
和ConcurrentHashMap
实例上的流上效果最佳;数组;int
范围;和long
范围。这些数据结构的共同点是它们都可以准确且廉价地拆分为任意大小的子范围,这使得在并行线程之间划分工作变得容易。流库用于执行此任务的抽象是 spliterator ,它由Stream
和Iterable
上的spliterator
方法返回。所有这些数据结构共有的另一个重要因素是,它们在顺序处理时提供了从良好到优秀的引用局部性:顺序元素引用一起存储在内存中。这些引用所引用的对象在内存中可能不会彼此靠近,这会减少引用的局部性。事实证明,引用局部性对于并行化批量操作至关重要:没有它,线程大部分时间都处于空闲状态,等待数据从内存传输到处理器的缓存中。具有最佳引用局部性的数据结构是原始数组,因为数据本身是连续存储在内存中的。
来源:Item #48 使用 Joshua Bloch 编写的并行有效 Java 3e 时要小心
【讨论】:
【参考方案6】:永远不要并行化有限制的无限流。这是发生了什么:
public static void main(String[] args)
// let's count to 1 in parallel
System.out.println(
IntStream.iterate(0, i -> i + 1)
.parallel()
.skip(1)
.findFirst()
.getAsInt());
结果
Exception in thread "main" java.lang.OutOfMemoryError
at ...
at java.base/java.util.stream.IntPipeline.findFirst(IntPipeline.java:528)
at InfiniteTest.main(InfiniteTest.java:24)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.stream.SpinedBuffer$OfInt.newArray(SpinedBuffer.java:750)
at ...
如果你使用.limit(...)
也一样
这里的解释: Java 8, using .parallel in a stream causes OOM error
同样,如果流是有序的并且包含比您想要处理的元素多得多的元素,例如
,请不要使用并行public static void main(String[] args)
// let's count to 1 in parallel
System.out.println(
IntStream.range(1, 1000_000_000)
.parallel()
.skip(100)
.findFirst()
.getAsInt());
这可能会运行更长的时间,因为并行线程可能会在大量数字范围而不是关键的 0-100 范围内工作,从而导致这需要很长时间。
【讨论】:
【参考方案7】:Collection.parallelStream()
是并行工作的好方法。但是您需要记住,这有效地使用了一个公共线程池,内部只有几个工作线程(默认情况下,线程数等于 cpu 核心数),请参阅ForkJoinPool.commonPool()
。如果池的某些任务是长时间运行的 I/O 密集型工作,那么其他可能很快的 parallelStream
调用将在等待空闲池线程时卡住。这显然导致要求 fork-join 任务是非阻塞和短的,或者换句话说,cpu-bound。为了更好地理解细节,我强烈建议仔细阅读java.util.concurrent.ForkJoinTask
javadoc,这里有一些相关的引用:
ForkJoinTasks 的效率源于......它们主要用作计算任务,计算纯函数或对纯孤立对象进行操作。
理想情况下,计算应避免同步方法或块,并应尽量减少其他阻塞同步
可细分任务也不应该执行阻塞 I/O
这些表明parallelStream()
任务的主要目的是对孤立的内存结构进行简短计算。也推荐查看文章Common parallel stream pitfalls
【讨论】:
以上是关于我应该尽可能使用并行流吗?的主要内容,如果未能解决你的问题,请参考以下文章
我应该在 Ant Media Server 中以 HLS 或 mp4 格式录制我的流吗?