如何设置并行集合的线程号?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何设置并行集合的线程号?相关的知识,希望对你有一定的参考价值。

我可以像这样并行运行scala的foreach:

val N = 100
(0 until N).par.foreach(i => {
   // do something
})

但是如何设置线程号?我想要这样的东西:

val N = 100
val NThreads = 5
(0 until N).par.foreach(NThreads, i => {
   // do something
})
答案

每个并行集合都保留一个tasksupport对象,该对象保持对线程池实现的引用。

因此,您可以根据需要通过将tasksupport对象的引用更改为新的线程池来设置通过该对象的并行度级别。例如:

def f(numOfThread: Int, n: Int) = {
 import scala.collection.parallel._
 val coll = (0 to n).par
 coll.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(numOfThreads))
  coll.foreach(i => {
   // do something
  })
}

f(2, 100)

有关配置并行集合的更多信息,请参阅http://docs.scala-lang.org/overviews/parallel-collections/configuration.html

另一答案

官方Scala文档提供了一种更改并行集合的任务支持的方法,如下所示:

import scala.collection.parallel._
val pc = mutable.ParArray(1, 2, 3)
pc.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(2))

还提到了这一点

默认情况下,执行上下文任务支持设置为每个并行集合,因此并行集合将重用与未来API相同的fork-join池。

这意味着您应该创建单个池并重用它。这种方法导致资源泄漏:

def calculate(collection: Seq[Int]): Seq[Int] = {
  val parallel = collection.par
  parallel.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(5))
  parallel.map(_ * 2).seq
} 

正确的方法是重用现有池:

val taskSupport = new ForkJoinTaskSupport(new ForkJoinPool(5))

def calculate(collection: Seq[Int]): Seq[Int] = {
  val parallel = collection.par
  parallel.tasksupport = taskSupport
  parallel.map(_ * 2).seq
}

以上是关于如何设置并行集合的线程号?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 python 中并行化以下代码片段?

Spark 中的并行集合

使用并行线程填充集合有啥危险吗?

多线程 Thread 线程同步 synchronized

C#并行编程-并发集合

newCacheThreadPool()newFixedThreadPool()newScheduledThreadPool()newSingleThreadExecutor()自定义线程池(代码片段