在 Scala 中对集合执行元素明智操作的并行函数
Posted
技术标签:
【中文标题】在 Scala 中对集合执行元素明智操作的并行函数【英文标题】:Parallel function to perform element wise operations on a collection in Scala 【发布时间】:2020-01-10 16:34:44 【问题描述】:在 Scala 中使用高阶方法,我可以对给定集合执行元素明智操作,如下所示
def fun1(l1 :List[Double], l2 :List[Double]) :List[Double] = (l1,l2).zipped.map((x,y) => x + y)
并且使用命令式的方式,我可以比 fun1 更快地完成相同的操作
def fun2(a1: Array[Double], a2: Array[Double]): Array[Double] =
val res = new Array[Double](a1.length)
var i = 0
while (i < a1.length)
res(i) = a1(i) + a2(i)
i += 1
res
我想编写一个执行相同操作的并行函数。使用 Scala,我可以使上述任何功能瘫痪吗?如果不是,我如何编写并行函数来实现集合上的元素明智操作的纯函数并行性?
【问题讨论】:
第一个你可以使用并行集合,第二个你可以开始很多未来,每个未来负责 array 的某个子集。 【参考方案1】:sergey$ SBT_OPTS="-Xmx2G -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=2G -Xss2M -Duser.timezone=GMT" sbt console
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=2G; support was removed in 8.0
[info] Loading global plugins from /Users/rsergey/.sbt/1.0/plugins
[info] Loading project definition from /Users/rsergey/project
[info] Set current project to rsergey (in build file:/Users/rsergey/)
[info] Starting scala interpreter...
Welcome to Scala 2.12.7 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_231).
Type in expressions for evaluation. Or try :help.
scala> def fun1(l1 :List[Double], l2 :List[Double]) :List[Double] = (l1,l2).zipped.map((x,y) => x + y)
fun1: (l1: List[Double], l2: List[Double])List[Double]
scala> import scala.collection.parallel.immutable.ParSeq
import scala.collection.parallel.immutable.ParSeq
scala> def parFun1(l1: ParSeq[Double], l2: ParSeq[Double]) = l1.zip(l2).mapcase (x,y)=>(x+y)
parFun1: (l1: scala.collection.parallel.immutable.ParSeq[Double], l2: scala.collection.parallel.immutable.ParSeq[Double])scala.collection.parallel.immutable.ParSeq[Double]
scala> val l1 = Range(0,5000000).map(_.toDouble).toList
l1: List[Double] = List(0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, 40.0, 41.0, 42.0, 43.0, 44.0, 45.0, 46.0, 47.0, 48.0, 49.0, 50.0, 51.0, 52.0, 53.0, 54.0, 55.0, 56.0, 57.0, 58.0, 59.0, 60.0, 61.0, 62.0, 63.0, 64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0, 71.0, 72.0, 73.0, 74.0, 75.0, 76.0, 77.0, 78.0, 79.0, 80.0, 81.0, 82.0, 83.0, 84.0, 85.0, 86.0, 87.0, 88.0, 89.0, 90.0, 91.0, 92.0, 93.0, 94.0, 95.0, 96.0, 97.0, 98.0, 99.0, 100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0, 110.0, 111.0, 112.0, 113.0, 114.0, 115.0, 116.0, 117.0, 118.0, 119.0, 120.0, 121.0, 122...
scala> val l2 = Range(-5000000, 0).map(_.toDouble).toList
l2: List[Double] = List(-5000000.0, -4999999.0, -4999998.0, -4999997.0, -4999996.0, -4999995.0, -4999994.0, -4999993.0, -4999992.0, -4999991.0, -4999990.0, -4999989.0, -4999988.0, -4999987.0, -4999986.0, -4999985.0, -4999984.0, -4999983.0, -4999982.0, -4999981.0, -4999980.0, -4999979.0, -4999978.0, -4999977.0, -4999976.0, -4999975.0, -4999974.0, -4999973.0, -4999972.0, -4999971.0, -4999970.0, -4999969.0, -4999968.0, -4999967.0, -4999966.0, -4999965.0, -4999964.0, -4999963.0, -4999962.0, -4999961.0, -4999960.0, -4999959.0, -4999958.0, -4999957.0, -4999956.0, -4999955.0, -4999954.0, -4999953.0, -4999952.0, -4999951.0, -4999950.0, -4999949.0, -4999948.0, -4999947.0, -4999946.0, -4999945.0, -4999944.0, -4999943.0, -4999942.0, -4999941.0, -4999940.0, -4999939.0, -49...
scala> val l1par = l1.par
l1par: scala.collection.parallel.immutable.ParSeq[Double] = ParVector(0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, 40.0, 41.0, 42.0, 43.0, 44.0, 45.0, 46.0, 47.0, 48.0, 49.0, 50.0, 51.0, 52.0, 53.0, 54.0, 55.0, 56.0, 57.0, 58.0, 59.0, 60.0, 61.0, 62.0, 63.0, 64.0, 65.0, 66.0, 67.0, 68.0, 69.0, 70.0, 71.0, 72.0, 73.0, 74.0, 75.0, 76.0, 77.0, 78.0, 79.0, 80.0, 81.0, 82.0, 83.0, 84.0, 85.0, 86.0, 87.0, 88.0, 89.0, 90.0, 91.0, 92.0, 93.0, 94.0, 95.0, 96.0, 97.0, 98.0, 99.0, 100.0, 101.0, 102.0, 103.0, 104.0, 105.0, 106.0, 107.0, 108.0, 109.0, 110.0, 111.0, 112.0, 113.0, 114.0, 115.0,...
scala> val l2par = l2.par
l2par: scala.collection.parallel.immutable.ParSeq[Double] = ParVector(-5000000.0, -4999999.0, -4999998.0, -4999997.0, -4999996.0, -4999995.0, -4999994.0, -4999993.0, -4999992.0, -4999991.0, -4999990.0, -4999989.0, -4999988.0, -4999987.0, -4999986.0, -4999985.0, -4999984.0, -4999983.0, -4999982.0, -4999981.0, -4999980.0, -4999979.0, -4999978.0, -4999977.0, -4999976.0, -4999975.0, -4999974.0, -4999973.0, -4999972.0, -4999971.0, -4999970.0, -4999969.0, -4999968.0, -4999967.0, -4999966.0, -4999965.0, -4999964.0, -4999963.0, -4999962.0, -4999961.0, -4999960.0, -4999959.0, -4999958.0, -4999957.0, -4999956.0, -4999955.0, -4999954.0, -4999953.0, -4999952.0, -4999951.0, -4999950.0, -4999949.0, -4999948.0, -4999947.0, -4999946.0, -4999945.0, -4999944.0, -4999943.0, -4999...
scala> def time[R](block: => R): R = val t0 = System.nanoTime(); val result = block; val t1 = System.nanoTime(); println("Elapsed time: " + (t1 - t0) + "ns"); result
time: [R](block: => R)R
scala> time fun1(l1, l2)
Elapsed time: 3928108671ns
res2: List[Double] = List(-5000000.0, -4999998.0, -4999996.0, -4999994.0, -4999992.0, -4999990.0, -4999988.0, -4999986.0, -4999984.0, -4999982.0, -4999980.0, -4999978.0, -4999976.0, -4999974.0, -4999972.0, -4999970.0, -4999968.0, -4999966.0, -4999964.0, -4999962.0, -4999960.0, -4999958.0, -4999956.0, -4999954.0, -4999952.0, -4999950.0, -4999948.0, -4999946.0, -4999944.0, -4999942.0, -4999940.0, -4999938.0, -4999936.0, -4999934.0, -4999932.0, -4999930.0, -4999928.0, -4999926.0, -4999924.0, -4999922.0, -4999920.0, -4999918.0, -4999916.0, -4999914.0, -4999912.0, -4999910.0, -4999908.0, -4999906.0, -4999904.0, -4999902.0, -4999900.0, -4999898.0, -4999896.0, -4999894.0, -4999892.0, -4999890.0, -4999888.0, -4999886.0, -4999884.0, -4999882.0, -4999880.0, -4999878.0, -...
scala> time parFun1(l1par, l2par)
Elapsed time: 292256058ns
res5: scala.collection.parallel.immutable.ParSeq[Double] = ParVector(-5000000.0, -4999998.0, -4999996.0, -4999994.0, -4999992.0, -4999990.0, -4999988.0, -4999986.0, -4999984.0, -4999982.0, -4999980.0, -4999978.0, -4999976.0, -4999974.0, -4999972.0, -4999970.0, -4999968.0, -4999966.0, -4999964.0, -4999962.0, -4999960.0, -4999958.0, -4999956.0, -4999954.0, -4999952.0, -4999950.0, -4999948.0, -4999946.0, -4999944.0, -4999942.0, -4999940.0, -4999938.0, -4999936.0, -4999934.0, -4999932.0, -4999930.0, -4999928.0, -4999926.0, -4999924.0, -4999922.0, -4999920.0, -4999918.0, -4999916.0, -4999914.0, -4999912.0, -4999910.0, -4999908.0, -4999906.0, -4999904.0, -4999902.0, -4999900.0, -4999898.0, -4999896.0, -4999894.0, -4999892.0, -4999890.0, -4999888.0, -4999886.0, -49998...
【讨论】:
为什么你使用 Zip 而不是 Zipped?因为 Zipped 比 Zip 快。 @user12140540 我不知道 ParVector 元组有zipped
可用。可以分享一个代码 sn-p 来证明这一点吗?【参考方案2】:
为了获得回报,每个元素的工作负载应该足够heavyweight。以下是基于顺序收集、并行收集和Future
s 的少数替代方案的基准。我们比较了轻量级操作(添加两个数字)以及使用Thread.sleep(1)
模拟繁重操作时的情况:
sequentialArray
:按顺序处理Array
(基于Travis)
futureArray
:在逻辑上将 Array
拆分为多个块,并在每个块上分别处理 Future
a
parallelArray
:使用并行集合处理Array
(基于Luis,axel22)
parallelListZip
:对zip
使用并行集合并处理List
(基于axel22)
实施:
@State(Scope.Benchmark)
@BenchmarkMode(Array(Mode.Throughput))
class So59685582
val simulateHeavyWorkload = ???
val length = ???
val as = Array.fill(length)(math.random)
val bs = Array.fill(length)(math.random)
def sequentialArray(as: Array[Double], bs: Array[Double]): Array[Double] =
val length = as.length
val out = new Array[Double](length)
var i = 0
while (i < length)
if (simulateHeavyWorkload) Thread.sleep(1)
out(i) = as(i) * bs(i)
i += 1
out
def futureArray(as: Array[Double], bs: Array[Double], numThreads: Int): Array[Double] =
val length = as.length
val out = new Array[Double](length)
val chunkSize = length / numThreads
val fs =
(0 until numThreads).map t =>
var i = t * chunkSize
val to = ((t + 1) * chunkSize)
Future
while (i <= to)
if (simulateHeavyWorkload) Thread.sleep(1)
out(i) = as(i) * bs(i)
i += 1
Await.ready(Future.sequence(fs), Duration.Inf)
out
def parallelArray(as: Array[Double], bs: Array[Double]): Array[Double] =
val length = as.length
val out = new Array[Double](length)
(0 until length).par.foreach i =>
if (simulateHeavyWorkload) Thread.sleep(1)
out(i) = as(i) + bs(i)
out
def parallelListZip(as: List[Double], bs: List[Double]): List[Double] =
as.par.zip(bs.par).map case (a, b) =>
if (simulateHeavyWorkload) Thread.sleep(1)
a + b
.to(List)
@Benchmark def _sequentialArray: Array[Double] = sequentialArray(as, bs)
@Benchmark def _futureArray: Array[Double] = futureArray(as, bs, numThreads = 12)
@Benchmark def _parallelArray: Array[Double] = parallelArray(as, bs)
@Benchmark def _parallelListZip: List[Double] = parallelListZip(as.toList, bs.toList)
sbt "jmh:run -i 10 -wi 10 -f 2 -t 1 bench.So59685582"
的结果:
测试 1
val simulateHeavyWorkload = true
val length = 1000
[info] Benchmark Mode Cnt Score Error Units
[info] So59685582._futureArray thrpt 20 9.251 ± 0.034 ops/s
[info] So59685582._parallelArray thrpt 20 6.493 ± 0.175 ops/s
[info] So59685582._parallelListZip thrpt 20 6.379 ± 0.117 ops/s
[info] So59685582._sequentialArray thrpt 20 0.790 ± 0.007 ops/s
测试 2
val simulateHeavyWorkload = false
val length = 1000
[info] So59685582._futureArray thrpt 20 27097.347 ± 369.995 ops/s
[info] So59685582._parallelArray thrpt 20 17864.004 ± 163.846 ops/s
[info] So59685582._parallelListZip thrpt 20 2942.416 ± 108.180 ops/s
[info] So59685582._sequentialArray thrpt 20 1773303.066 ± 55856.225 ops/s
测试 3
val simulateHeavyWorkload = false
val length = 10000000
[info] Benchmark Mode Cnt Score Error Units
[info] So59685582._futureArray thrpt 20 50.271 ± 1.444 ops/s
[info] So59685582._parallelArray thrpt 20 53.998 ± 1.397 ops/s
[info] So59685582._parallelListZip thrpt 20 0.167 ± 0.040 ops/s
[info] So59685582._sequentialArray thrpt 20 55.183 ± 1.025 ops/s
调查结果
当操作是轻量级时,顺序处理的性能更好或大致等于并行处理,即使是 10000000 个元素的大尺寸。 当规模较小 (1000) 时,并行处理在轻量级操作上会慢几个数量级 当操作是重量级的并行处理比顺序内执行更好的顺序futureArray
在线程数为 12 时表现最佳,这是我机器上的核心数,根据 availableProcessors
。超出此范围会导致性能下降。
使用List
、zip
和map
的parallelListZip
与使用可变性和while 循环的parallelArray
具有相似的性能,当操作是重量级且大小不太大(1000)时。
【讨论】:
我尝试了不同的方式和不同的集合,包括并行,我还注意到对于较小的集合,顺序版本比并行版本快。你也用基准证明了这一点。 在我的场景中,我有两个数组,我必须对这两个数组进行元素明智的求和 100 万次迭代,我发现并行版本不适合这个问题 @user12140540 这可能是由于元素总和过于轻量级的操作。以上是关于在 Scala 中对集合执行元素明智操作的并行函数的主要内容,如果未能解决你的问题,请参考以下文章