并行 Repa 代码不会产生火花

Posted

技术标签:

【中文标题】并行 Repa 代码不会产生火花【英文标题】:Parallel Repa code doesn't create sparks 【发布时间】:2013-04-19 04:52:05 【问题描述】:

我正在编写代码来做一个子集产品:它需要一个元素列表和一个指标变量列表(长度相同)。产品是在树中计算的,这对我们的应用程序至关重要。每个产品都很昂贵,所以我的目标是并行计算树的每个级别,按顺序评估连续的级别。因此不存在任何嵌套并行。

我在 ONE 函数中只有 repa 代码,靠近我的整体代码的顶层。请注意,subsetProd 不是 monadic。

步骤:

    将列表分块成对(无并行性) 压缩分块列表(无并行性) 在这个列表上映射乘积函数(使用 Repa 映射),创建一个延迟数组 调用 computeP 并行评估地图 将 Repa 结果转换回列表 进行递归调用(在输入大小一半的列表上)

代码:

-# LANGUAGE TypeOperators, FlexibleContexts, BangPatterns #-

import System.Random
import System.Environment (getArgs)
import Control.Monad.State
import Control.Monad.Identity (runIdentity)

import Data.Array.Repa as Repa
import Data.Array.Repa.Eval as Eval
import Data.Array.Repa.Repr.Vector

force :: (Shape sh) => Array D sh e -> Array V sh e
force = runIdentity . computeP

chunk :: [a] -> [(a,a)]
chunk [] = []
chunk (x1:x2:xs) = (x1,x2):(chunk xs)

slow_fib :: Int -> Integer
slow_fib 0 = 0
slow_fib 1 = 1
slow_fib n = slow_fib (n-2) + slow_fib (n-1) 

testSubsetProd :: Int -> Int -> IO ()
testSubsetProd size seed = do
    let work = do
            !flags <- replicateM size (state random)
            !values <- replicateM size (state $ randomR (1,10))
            return $ subsetProd values flags
        value = evalState work (mkStdGen seed)
    print value

subsetProd :: [Int] -> [Bool] -> Int
subsetProd [!x] _ = x
subsetProd !vals !flags = 
    let len = (length vals) `div` 2
        !valpairs = Eval.fromList (Z :. len) $ chunk vals :: (Array V (Z :. Int) (Int, Int))
        !flagpairs = Eval.fromList (Z :. len) $ chunk flags :: (Array V (Z :. Int) (Bool, Bool))
        !prods = force $ Repa.zipWith mul valpairs flagpairs
        mul (!v0,!v1) (!f0,!f1)
            | (not f0) && (not f1) = 1
            | (not f0) = v0+1
            | (not f1) = v1+1
            | otherwise = fromInteger $ slow_fib ((v0*v1) `mod` 35)
    in subsetProd (toList prods) (Prelude.map (uncurry (||)) (toList flagpairs))

main :: IO ()
main = do
  args <- getArgs
  let [numleaves, seed] = Prelude.map read args :: [Int]
  testSubsetProd numleaves seed

整个程序编译用

ghc -Odph -rtsopts -threaded -fno-liberate-case -funfolding-use-threshold1000 -funfolding-keeness-factor1000 -fllvm -optlo-O3

根据these instructions,在 GHC 7.6.2 x64 上。

我使用

运行我的程序(子集)
$> time ./Test 4096 4 +RTS -sstderr -N4

8 秒后:

672,725,819,784 bytes allocated in the heap
 11,312,267,200 bytes copied during GC
   866,787,872 bytes maximum residency (49 sample(s))
   433,225,376 bytes maximum slop
        2360 MB total memory in use (0 MB lost due to fragmentation)

                                Tot time (elapsed)  Avg pause  Max pause


  Gen  0     1284212 colls, 1284212 par   174.17s   53.20s     0.0000s    0.0116s
  Gen  1        49 colls,    48 par   13.76s    4.63s     0.0946s    0.6412s

  Parallel GC work balance: 16.88% (serial 0%, perfect 100%)

  TASKS: 6 (1 bound, 5 peak workers (5 total), using -N4)

  SPARKS: 0 (0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled)

  INIT    time    0.00s  (  0.00s elapsed)
  MUT     time  497.80s  (448.38s elapsed)
  GC      time  187.93s  ( 57.84s elapsed)
  EXIT    time    0.00s  (  0.00s elapsed)
  Total   time  685.73s  (506.21s elapsed)

  Alloc rate    1,351,400,138 bytes per MUT second

  Productivity  72.6% of total user, 98.3% of total elapsed

gc_alloc_block_sync: 8670031
whitehole_spin: 0
gen[0].sync: 0
gen[1].sync: 571398

当我增加 -N 参数时,我的代码确实变慢了(-N1 为 7.628 秒,-N2 为 7.891 秒,-N4 为 8.659 秒)但我创建了 0 个火花,这似乎是主要嫌疑人至于为什么我没有得到任何并行性。此外,使用大量优化进行编译有助于运行时,但对并行性没有帮助。

Threadscope 确认没有对三个 HEC 进行任何认真的工作,但垃圾收集器似乎正在使用所有 4 个 HEC。

那么,为什么 Repa 没有产生任何火花呢?我的产品树有 64 个叶子,所以即使 Repa 为 每个 内部节点生成了火花,也应该有大约 63 个火花。我觉得这可能与我使用封装并行性的 ST monad 有关,尽管我不太确定为什么这会导致问题。也许火花只能在 IO monad 中创建?

如果是这种情况,是否有人知道我如何执行这个树产品,其中每个级别都并行完成(不会导致嵌套并行,这对我的任务来说似乎是不必要的)。一般来说,也许有更好的方法来并行化树产品或更好地利用 Repa。

解释为什么当我增加 -N 参数时运行时间会增加,即使没有创建火花也是如此。

编辑 我将上面的代码示例更改为我的问题的编译示例。程序流程几乎与我的真实代码完美匹配:我随机选择一些输入,然后对它们进行子集乘积。我现在使用identity monad。我已经尝试对我的代码进行许多小的更改:是否内联,是否使用爆炸模式,使用两个 Repa 列表和一个 Repa zipWith 与按顺序压缩列表并使用 Repa 映射等的变化,等等,这些都没有帮助。

即使我在示例代码中遇到this 问题,我的实际程序也要大得多

【问题讨论】:

g 定义在哪里? 要对性能问题进行更详细的分析,拥有一个稍微精简但可编译的代码版本确实会有所帮助。一些小评论:我不确定是否值得首先建立一个清单数组pairs。那也可以推迟。为什么使用长度为2 的列表而不是对?结果数组可以取消装箱,因为它包含Int。您应该尝试生成事件日志并运行线程范围,以查看程序的某些阶段是否发生并行。 哦,我现在才看到subsetProd 是递归的。您确定要将数组转换为列表,只是为了在每个步骤中重新计算一个数组吗? @JohnL 这只是一个更大程序的一个 sn-p。上面的代码中有几个“外部”调用,但它们都是纯的、顺序的、(昂贵的)函数。 @kosmikus Manifest 数组不应该被延迟,对吧?我认为这就是重点。我使用列表是因为有一个很好的函数(在 Data.List.Split 中)可以为我进行分块。正如我在问题中所说,在执行的最后一毫秒之前,Threadscope 显示程序代码之间的并行度绝对为零。 GC 在所有四个线程上运行。我可以做一个小例子,做一个整数子集产品之类的,但恐怕它太快了,无法实现任何并行化。 【参考方案1】:

为什么没有并行性?

没有并行性的主要原因(至少对于您现在简化和工作的程序)是您在V 表示的数组上使用computeP,并且法线向量的元素并不严格类型。所以你实际上并没有并行地做任何真正的工作。最简单的解决方法是使用未装箱的 U 数组作为结果,方法是将 force 更改为以下定义:

force :: (Shape sh, Unbox e) => Array D sh e -> Array U sh e
force a = runIdentity (computeP a) 

我确实记得在您的原始代码中,您声称您正在使用一种未拆箱的复杂数据类型。但真的不可能做到吗?也许您可以将您实际需要的数据提取到一些不可装箱的表示中?或者使该类型成为Unbox 类的实例?如果没有,那么您还可以使用适用于V-array 的force 的以下变体:

import Control.DeepSeq (NFData(..))

...

force :: (Shape sh, NFData e) => Array D sh e -> Array V sh e
force a = runIdentity $ do
  r  <- computeP a
  !b <- computeUnboxedP (Repa.map rnf r)
  return r

这里的想法是我们首先计算V-array 结构,然后我们通过将rnf 映射到数组上来计算() 类型的U-array。结果数组是无趣的,但是V-array 的每个元素都将在进程中强制执行1

这些更改中的任何一个都将4096 的问题大小的运行时间从 ~9 减少到 ~3 秒,而 -N4 在我的机器上运行。

另外,我觉得每一步都在列表和数组之间进行转换很奇怪。为什么不让subsetProd 取两个数组呢?此外,至少对于值而言,似乎没有必要使用中间的 V 数组作为对,您也可以使用 D 数组。但在我的实验中,这些更改并没有对运行时产生显着的有益影响。

为什么没有火花?

Repa 从不产生火花。 Haskell 有许多不同的并行方法,而 spark 是一种在运行时系统中具有特殊支持的特殊机制。但是,只有一些库,例如parallel 包和monad-par 包的一个特定调度程序,实际上使用了该机制。然而,Repa 没有。它在内部使用forkIO,即线程,但向外部提供纯接口。所以没有火花本身就没什么好担心的。


1. 我本来不知道怎么做,所以我问了 Repa 的作者 Ben Lippmeier。非常感谢 Ben 指出映射rnf 以生成不同数组的选项,以及()Unbox 实例对我来说。

【讨论】:

我对使用未装箱数组使并行性起作用感到非常震惊。我肯定会付出更多努力来制作我的类型 Unbox,但这可能会很困难。至于为什么我一开始不使用数组,请参阅我对原始问题的评论,即无法像使用列表一样使用数组。感谢您的帮助! @Eric 对数组进行分块非常简单。例如,您可以像这样创建一个延迟数组:fromFunction (Z :. len) $ \ (Z :. i) -&gt; (vals ! (Z :. 2 * i), vals ! (Z :. 2 * i + 1)). @Eric 我再次编辑了答案,因为 Ben Lippmeier 向我解释了如何使它适用于 V-arrays,如果需要的话。 感谢您的更新。我确实从您的建议中得到了一些并行性,但是并行策略(为每个内部节点创造火花)轻松击败了 Repa。

以上是关于并行 Repa 代码不会产生火花的主要内容,如果未能解决你的问题,请参考以下文章

如何在并行火花中运行转换

如何在haskell快速傅里叶变换中应用数据并行?

Spark Streaming:微批处理并行执行

如何在 AWS Glue PySpark 中运行并行线程?

为啥 OpenMP 不并行化 vtk IntersectWithLine 代码

分库的数据用java怎么并行查询