在 Scala 中对集合执行元素明智操作的并行函数

Posted

技术标签:

【中文标题】在 Scala 中对集合执行元素明智操作的并行函数【英文标题】:Parallel function to perform element wise operations on a collection in Scala 【发布时间】:2020-01-10 16:34:44 【问题描述】:

在 Scala 中使用高阶方法,我可以对给定集合执行元素明智操作,如下所示

def fun1(l1 :List[Double], l2 :List[Double]) :List[Double] = (l1,l2).zipped.map((x,y) => x + y)

并且使用命令式的方式,我可以比 fun1 更快地完成相同的操作

def fun2(a1: Array[Double], a2: Array[Double]): Array[Double] = 
  val res = new Array[Double](a1.length)
  var i = 0
  while (i < a1.length) 
    res(i) = a1(i) + a2(i)
    i += 1
  
  res

我想编写一个执行相同操作的并行函数。使用 Scala,我可以使上述任何功能瘫痪吗?如果不是,我如何编写并行函数来实现集合上的元素明智操作的纯函数并行性?

【问题讨论】:

第一个你可以使用并行集合,第二个你可以开始很多未来,每个未来负责 array 的某个子集。 【参考方案1】:
sergey$ SBT_OPTS="-Xmx2G -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=2G -Xss2M  -Duser.timezone=GMT" sbt console
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=2G; support was removed in 8.0
[info] Loading global plugins from /Users/rsergey/.sbt/1.0/plugins
[info] Loading project definition from /Users/rsergey/project
[info] Set current project to rsergey (in build file:/Users/rsergey/)
[info] Starting scala interpreter...
Welcome to Scala 2.12.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231).
Type in expressions for evaluation. Or try :help.

scala> def fun1(l1 :List[Double], l2 :List[Double]) :List[Double] = (l1,l2).zipped.map((x,y) => x + y)
fun1: (l1: List[Double], l2: List[Double])List[Double]

scala> import scala.collection.parallel.immutable.ParSeq
import scala.collection.parallel.immutable.ParSeq

scala> def parFun1(l1: ParSeq[Double], l2: ParSeq[Double]) = l1.zip(l2).mapcase (x,y)=>(x+y)
parFun1: (l1: scala.collection.parallel.immutable.ParSeq[Double], l2: scala.collection.parallel.immutable.ParSeq[Double])scala.collection.parallel.immutable.ParSeq[Double]


scala> val l1 = Range(0,5000000).map(_.toDouble).toList
l1: List[Double] = List(0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, 40.0, 41.0, 42.0, 43.0, 44.0, 45.0, 46.0, 47.0, 48.0, 49.0, 50.0, 51.0, 52.0, 53.0, 54.0, 55.0, 56.0, 57.0, 58.0, 59.0, 60.0, 61.0, 62.0, 63.0, 64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0, 71.0, 72.0, 73.0, 74.0, 75.0, 76.0, 77.0, 78.0, 79.0, 80.0, 81.0, 82.0, 83.0, 84.0, 85.0, 86.0, 87.0, 88.0, 89.0, 90.0, 91.0, 92.0, 93.0, 94.0, 95.0, 96.0, 97.0, 98.0, 99.0, 100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0, 110.0, 111.0, 112.0, 113.0, 114.0, 115.0, 116.0, 117.0, 118.0, 119.0, 120.0, 121.0, 122...

scala> val l2 = Range(-5000000, 0).map(_.toDouble).toList
l2: List[Double] = List(-5000000.0, -4999999.0, -4999998.0, -4999997.0, -4999996.0, -4999995.0, -4999994.0, -4999993.0, -4999992.0, -4999991.0, -4999990.0, -4999989.0, -4999988.0, -4999987.0, -4999986.0, -4999985.0, -4999984.0, -4999983.0, -4999982.0, -4999981.0, -4999980.0, -4999979.0, -4999978.0, -4999977.0, -4999976.0, -4999975.0, -4999974.0, -4999973.0, -4999972.0, -4999971.0, -4999970.0, -4999969.0, -4999968.0, -4999967.0, -4999966.0, -4999965.0, -4999964.0, -4999963.0, -4999962.0, -4999961.0, -4999960.0, -4999959.0, -4999958.0, -4999957.0, -4999956.0, -4999955.0, -4999954.0, -4999953.0, -4999952.0, -4999951.0, -4999950.0, -4999949.0, -4999948.0, -4999947.0, -4999946.0, -4999945.0, -4999944.0, -4999943.0, -4999942.0, -4999941.0, -4999940.0, -4999939.0, -49...

scala> val l1par = l1.par
l1par: scala.collection.parallel.immutable.ParSeq[Double] = ParVector(0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, 40.0, 41.0, 42.0, 43.0, 44.0, 45.0, 46.0, 47.0, 48.0, 49.0, 50.0, 51.0, 52.0, 53.0, 54.0, 55.0, 56.0, 57.0, 58.0, 59.0, 60.0, 61.0, 62.0, 63.0, 64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0, 71.0, 72.0, 73.0, 74.0, 75.0, 76.0, 77.0, 78.0, 79.0, 80.0, 81.0, 82.0, 83.0, 84.0, 85.0, 86.0, 87.0, 88.0, 89.0, 90.0, 91.0, 92.0, 93.0, 94.0, 95.0, 96.0, 97.0, 98.0, 99.0, 100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0, 110.0, 111.0, 112.0, 113.0, 114.0, 115.0,...

scala> val l2par = l2.par
l2par: scala.collection.parallel.immutable.ParSeq[Double] = ParVector(-5000000.0, -4999999.0, -4999998.0, -4999997.0, -4999996.0, -4999995.0, -4999994.0, -4999993.0, -4999992.0, -4999991.0, -4999990.0, -4999989.0, -4999988.0, -4999987.0, -4999986.0, -4999985.0, -4999984.0, -4999983.0, -4999982.0, -4999981.0, -4999980.0, -4999979.0, -4999978.0, -4999977.0, -4999976.0, -4999975.0, -4999974.0, -4999973.0, -4999972.0, -4999971.0, -4999970.0, -4999969.0, -4999968.0, -4999967.0, -4999966.0, -4999965.0, -4999964.0, -4999963.0, -4999962.0, -4999961.0, -4999960.0, -4999959.0, -4999958.0, -4999957.0, -4999956.0, -4999955.0, -4999954.0, -4999953.0, -4999952.0, -4999951.0, -4999950.0, -4999949.0, -4999948.0, -4999947.0, -4999946.0, -4999945.0, -4999944.0, -4999943.0, -4999...

scala> def time[R](block: => R): R = val t0 = System.nanoTime(); val result = block; val t1 = System.nanoTime(); println("Elapsed time: " + (t1 - t0) + "ns"); result 
time: [R](block: => R)R

scala> time  fun1(l1, l2) 
Elapsed time: 3928108671ns
res2: List[Double] = List(-5000000.0, -4999998.0, -4999996.0, -4999994.0, -4999992.0, -4999990.0, -4999988.0, -4999986.0, -4999984.0, -4999982.0, -4999980.0, -4999978.0, -4999976.0, -4999974.0, -4999972.0, -4999970.0, -4999968.0, -4999966.0, -4999964.0, -4999962.0, -4999960.0, -4999958.0, -4999956.0, -4999954.0, -4999952.0, -4999950.0, -4999948.0, -4999946.0, -4999944.0, -4999942.0, -4999940.0, -4999938.0, -4999936.0, -4999934.0, -4999932.0, -4999930.0, -4999928.0, -4999926.0, -4999924.0, -4999922.0, -4999920.0, -4999918.0, -4999916.0, -4999914.0, -4999912.0, -4999910.0, -4999908.0, -4999906.0, -4999904.0, -4999902.0, -4999900.0, -4999898.0, -4999896.0, -4999894.0, -4999892.0, -4999890.0, -4999888.0, -4999886.0, -4999884.0, -4999882.0, -4999880.0, -4999878.0, -...


scala> time  parFun1(l1par, l2par) 
Elapsed time: 292256058ns
res5: scala.collection.parallel.immutable.ParSeq[Double] = ParVector(-5000000.0, -4999998.0, -4999996.0, -4999994.0, -4999992.0, -4999990.0, -4999988.0, -4999986.0, -4999984.0, -4999982.0, -4999980.0, -4999978.0, -4999976.0, -4999974.0, -4999972.0, -4999970.0, -4999968.0, -4999966.0, -4999964.0, -4999962.0, -4999960.0, -4999958.0, -4999956.0, -4999954.0, -4999952.0, -4999950.0, -4999948.0, -4999946.0, -4999944.0, -4999942.0, -4999940.0, -4999938.0, -4999936.0, -4999934.0, -4999932.0, -4999930.0, -4999928.0, -4999926.0, -4999924.0, -4999922.0, -4999920.0, -4999918.0, -4999916.0, -4999914.0, -4999912.0, -4999910.0, -4999908.0, -4999906.0, -4999904.0, -4999902.0, -4999900.0, -4999898.0, -4999896.0, -4999894.0, -4999892.0, -4999890.0, -4999888.0, -4999886.0, -49998...

【讨论】:

为什么你使用 Zip 而不是 Zipped?因为 Zipped 比 Zip 快。 @user12140540 我不知道 ParVector 元组有 zipped 可用。可以分享一个代码 sn-p 来证明这一点吗?【参考方案2】:

为了获得回报,每个元素的工作负载应该足够heavyweight。以下是基于顺序收集、并行收集和Futures 的少数替代方案的基准。我们比较了轻量级操作(添加两个数字)以及使用Thread.sleep(1) 模拟繁重操作时的情况:

sequentialArray:按顺序处理Array(基于Travis) futureArray:在逻辑上将 Array 拆分为多个块,并在每个块上分别处理 Futurea parallelArray:使用并行集合处理Array(基于Luis,axel22) parallelListZip:对zip 使用并行集合并处理List(基于axel22)

实施:

@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
class So59685582 
  val simulateHeavyWorkload = ???
  val length = ???
  val as = Array.fill(length)(math.random)
  val bs = Array.fill(length)(math.random)

  def sequentialArray(as: Array[Double], bs: Array[Double]): Array[Double] = 
    val length = as.length
    val out = new Array[Double](length)
    var i = 0
    while (i < length) 
      if (simulateHeavyWorkload) Thread.sleep(1)
      out(i) = as(i) * bs(i)
      i += 1
    
    out
  

  def futureArray(as: Array[Double], bs: Array[Double], numThreads: Int): Array[Double] = 
    val length = as.length
    val out = new Array[Double](length)
    val chunkSize = length / numThreads
    val fs =
      (0 until numThreads).map  t =>
        var i = t * chunkSize
        val to = ((t + 1) * chunkSize)
        Future 
          while (i <= to) 
            if (simulateHeavyWorkload) Thread.sleep(1)
            out(i) = as(i) * bs(i)
            i += 1
          
        
      
    Await.ready(Future.sequence(fs), Duration.Inf)
    out
  

  def parallelArray(as: Array[Double], bs: Array[Double]): Array[Double] = 
    val length = as.length
    val out = new Array[Double](length)
    (0 until length).par.foreach  i =>
      if (simulateHeavyWorkload) Thread.sleep(1)
      out(i) = as(i) + bs(i)
    
    out
  

  def parallelListZip(as: List[Double], bs: List[Double]): List[Double] = 
    as.par.zip(bs.par).map  case (a, b) =>
      if (simulateHeavyWorkload) Thread.sleep(1)
      a + b
    .to(List)
  

  @Benchmark def _sequentialArray: Array[Double] = sequentialArray(as, bs)
  @Benchmark def _futureArray: Array[Double] = futureArray(as, bs, numThreads = 12)
  @Benchmark def _parallelArray: Array[Double] = parallelArray(as, bs)
  @Benchmark def _parallelListZip: List[Double] = parallelListZip(as.toList, bs.toList)


sbt "jmh:run -i 10 -wi 10 -f 2 -t 1 bench.So59685582"的结果:

测试 1

val simulateHeavyWorkload = true
val length = 1000

[info] Benchmark                     Mode  Cnt  Score   Error  Units
[info] So59685582._futureArray      thrpt   20  9.251 ± 0.034  ops/s
[info] So59685582._parallelArray    thrpt   20  6.493 ± 0.175  ops/s
[info] So59685582._parallelListZip  thrpt   20  6.379 ± 0.117  ops/s
[info] So59685582._sequentialArray  thrpt   20  0.790 ± 0.007  ops/s

测试 2

val simulateHeavyWorkload = false
val length = 1000

[info] So59685582._futureArray      thrpt   20    27097.347 ±   369.995  ops/s
[info] So59685582._parallelArray    thrpt   20    17864.004 ±   163.846  ops/s
[info] So59685582._parallelListZip  thrpt   20     2942.416 ±   108.180  ops/s
[info] So59685582._sequentialArray  thrpt   20  1773303.066 ± 55856.225  ops/s

测试 3

val simulateHeavyWorkload = false
val length = 10000000

[info] Benchmark                     Mode  Cnt   Score   Error  Units
[info] So59685582._futureArray      thrpt   20  50.271 ± 1.444  ops/s
[info] So59685582._parallelArray    thrpt   20  53.998 ± 1.397  ops/s
[info] So59685582._parallelListZip  thrpt   20   0.167 ± 0.040  ops/s
[info] So59685582._sequentialArray  thrpt   20  55.183 ± 1.025  ops/s

调查结果

当操作是轻量级时,顺序处理的性能更好或大致等于并行处理,即使是 10000000 个元素的大尺寸。 当规模较小 (1000) 时,并行处理在轻量级操作上会慢几个数量级 当操作是重量级的并行处理比顺序内执行更好的顺序 futureArray 在线程数为 12 时表现最佳,这是我机器上的核心数,根据 availableProcessors。超出此范围会导致性能下降。 使用ListzipmapparallelListZip 与使用可变性和while 循环的parallelArray 具有相似的性能,当操作是重量级且大小不太大(1000)时。

【讨论】:

我尝试了不同的方式和不同的集合,包括并行,我还注意到对于较小的集合,顺序版本比并行版本快。你也用基准证明了这一点。 在我的场景中,我有两个数组,我必须对这两个数组进行元素明智的求和 100 万次迭代,我发现并行版本不适合这个问题 @user12140540 这可能是由于元素总和过于轻量级的操作。

以上是关于在 Scala 中对集合执行元素明智操作的并行函数的主要内容,如果未能解决你的问题,请参考以下文章

Scala集合

大数据技术之_16_Scala学习_08_数据结构(下)-集合操作+模式匹配

在python中对自定义类执行集合操作

scala的多种集合的使用之遍历集合的方法

Spark02

scala 数据结构(八 ):-map映射操作