如何设置并行集合的线程号?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何设置并行集合的线程号?相关的知识,希望对你有一定的参考价值。
我可以像这样并行运行scala的foreach:
val N = 100
(0 until N).par.foreach(i => {
// do something
})
但是如何设置线程号?我想要这样的东西:
val N = 100
val NThreads = 5
(0 until N).par.foreach(NThreads, i => {
// do something
})
答案
每个并行集合都保留一个tasksupport
对象,该对象保持对线程池实现的引用。
因此,您可以根据需要通过将tasksupport
对象的引用更改为新的线程池来设置通过该对象的并行度级别。例如:
def f(numOfThread: Int, n: Int) = {
import scala.collection.parallel._
val coll = (0 to n).par
coll.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(numOfThreads))
coll.foreach(i => {
// do something
})
}
f(2, 100)
有关配置并行集合的更多信息,请参阅http://docs.scala-lang.org/overviews/parallel-collections/configuration.html
另一答案
官方Scala文档提供了一种更改并行集合的任务支持的方法,如下所示:
import scala.collection.parallel._
val pc = mutable.ParArray(1, 2, 3)
pc.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))
还提到了这一点
默认情况下,执行上下文任务支持设置为每个并行集合,因此并行集合将重用与未来API相同的fork-join池。
这意味着您应该创建单个池并重用它。这种方法导致资源泄漏:
def calculate(collection: Seq[Int]): Seq[Int] = {
val parallel = collection.par
parallel.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(5))
parallel.map(_ * 2).seq
}
正确的方法是重用现有池:
val taskSupport = new ForkJoinTaskSupport(new ForkJoinPool(5))
def calculate(collection: Seq[Int]): Seq[Int] = {
val parallel = collection.par
parallel.tasksupport = taskSupport
parallel.map(_ * 2).seq
}
以上是关于如何设置并行集合的线程号?的主要内容,如果未能解决你的问题,请参考以下文章
newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段