为啥我使用 iteratee IO 的 Mapreduce 实现(真实世界的 Haskell)也因“打开的文件太多”而失败
Posted
技术标签:
【中文标题】为啥我使用 iteratee IO 的 Mapreduce 实现(真实世界的 Haskell)也因“打开的文件太多”而失败【英文标题】:Why does my Mapreduce implementation (real world haskell) using iteratee IO also fails with "Too many open files"为什么我使用 iteratee IO 的 Mapreduce 实现(真实世界的 Haskell)也因“打开的文件太多”而失败 【发布时间】:2011-05-02 11:14:06 【问题描述】:我正在实现一个 haskell 程序,它将文件的每一行与文件中的每一行进行比较。可以按如下方式实现单线程
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0
这将在 O(n^2) 时间内运行,并且必须始终将完整的整数列表保存在内存中。在我的实际程序中,该行包含更多的数字,我从中构造了一个比 Int 稍微复杂的数据类型。这让我必须处理的数据出现内存不足错误。
所以上面提到的单线程解决方案有两点改进。一是加快实际运行时间。其次,想办法不将整个列表一直保存在内存中。我知道这需要解析完整的文件 n 次。因此将进行 O(n^2) 比较,并解析 O(n^2) 行。这对我来说没问题,因为我宁愿有一个缓慢的成功程序而不是失败的程序。当输入文件足够小时,我总是可以使用更简单的版本。
为了使用多个 cpu 内核,我从 Real World Haskell 中提取了 Mapreduce 实现(第 24 章,可用 here)。
我修改了书中的分块功能,而不是将整个文件分成块,而是返回与行一样多的块,每个块代表一个元素
tails . lines . readFile
因为我希望程序的文件大小也可以扩展,所以我最初使用了 lazy IO。然而,这因“打开的文件太多”而失败,我在previous question 中询问了这个问题(GC 处理文件句柄太晚了)。完整的惰性 IO 版本发布在那里。
正如接受的答案所解释的,严格 IO 可以解决问题。这确实解决了 2k 行文件的“打开文件过多”问题,但在 50k 文件上因“内存不足”而失败。
请注意,第一个单线程实现(没有 mapreduce)能够处理 50k 文件。
另一个对我最有吸引力的解决方案是使用 iteratee IO。我希望这可以解决文件句柄和内存资源耗尽问题。然而,我的实现仍然失败,在 2k 行文件上出现“打开的文件过多”错误。
iteratee IO 版本具有与书中相同的 mapReduce 功能,但修改了 chunkedFileEnum 以使其与 Enumerator 一起使用.
因此我的问题是;以下迭代 IO 基础实现有什么问题?哪来的懒惰?
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans (MonadIO, liftIO)
import System.IO
import qualified Data.Enumerator.List as EL
import qualified Data.Enumerator.Text as ET
import Data.Enumerator hiding (map, filter, head, sequence)
import Data.Text(Text)
import Data.Text.Read
import Data.Maybe
import qualified Data.ByteString.Char8 as Str
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances
--My operation for one value pair
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
combineDistances :: [Int] -> Int
combineDistances = sum
--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
where infiniteList :: Int->Int-> [Int]
infiniteList i j = (i + j) : infiniteList j (i+j)
--Applying my operation simply on a file
--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)
--But I want to use multiple cores..
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0
--Setting up an enumerator of read values from a text stream
readerEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m b
readerEnumerator reader = joinI . (EL.concatMapM transformer)
where transformer input = case reader input of
Right (val, remainder) -> return [val]
Left err -> return [0]
readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m b
readEnumerator = readerEnumerator (signed decimal)
--The iteratee version of my operation
distancesFirstToTailIt :: Monad m=> Iteratee Int m Int
distancesFirstToTailIt = do
maybeNum <- EL.head
maybe (return 0) distancesOneToManyIt maybeNum
distancesOneToManyIt :: Monad m=> Int -> Iteratee Int m Int
distancesOneToManyIt base = do
maybeNum <- EL.head
maybe (return 0) combineNextDistance maybeNum
where combineNextDistance nextNum = do
rest <- distancesOneToManyIt base
return $ combineDistances [(distance base nextNum),rest]
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
-> (a -> b) -- map function
-> Strategy c -- evaluation strategy for reduction
-> ([b] -> c) -- reduce function
-> [a] -- list to map over
-> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
mapResult `pseq` reduceResult
where mapResult = parMap mapStrat mapFunc input
reduceResult = reduceFunc mapResult `using` reduceStrat
--Applying the iteratee operation using mapreduce
sumOfDistancesOnFileWithIt :: FilePath -> IO Int
sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) path
distancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO Int
distancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc)
rpar (sumValuesAsReduceFunc)
where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int
runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt))
sumValuesAsReduceFunc :: [IO Int] -> IO Int
sumValuesAsReduceFunc = liftM sum . sequence
--Working with (file)chunk enumerators:
data ChunkSpec = CS
chunkOffset :: !Int
, chunkLength :: !Int
deriving (Eq,Show)
chunkedFileEnum :: (NFData (a)) => MonadIO m =>
(FilePath-> IO [ChunkSpec])
-> ([Enumerator Text m b]->IO a)
-> FilePath
-> IO a
chunkedFileEnum chunkCreator funcOnChunks path = do
(chunks, handles)<- chunkedEnum chunkCreator path
r <- funcOnChunks chunks
(rdeepseq r `seq` (return r)) `finally` mapM_ hClose handles
chunkedEnum :: MonadIO m=>
(FilePath -> IO [ChunkSpec])
-> FilePath
-> IO ([Enumerator Text m b], [Handle])
chunkedEnum chunkCreator path = do
chunks <- chunkCreator path
liftM unzip . forM chunks $ \spec -> do
h <- openFile path ReadMode
hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF
return (chunk,h)
-- returns set of chunks representing tails . lines . readFile
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
bracket (openFile path ReadMode) hClose $ \h-> do
totalSize <- fromIntegral `liftM` hFileSize h
let chunkSize = 1
findChunks offset = do
let newOffset = offset + chunkSize
hSeek h AbsoluteSeek (fromIntegral newOffset)
let findNewline lineSeekOffset = do
eof <- hIsEOF h
if eof
then return [CS offset (totalSize - offset)]
else do
bytes <- Str.hGet h 256
case Str.elemIndex '\n' bytes of
Just n -> do
nextChunks <- findChunks (lineSeekOffset + n + 1)
return (CS offset (totalSize-offset):nextChunks)
Nothing -> findNewline (lineSeekOffset + Str.length bytes)
findNewline newOffset
findChunks 0
顺便说一句,我正在跑步 Mac OS X 10.6.7(雪豹)上的 HaskellPlatform 2011.2.0 带有以下软件包: 字节串 0.9.1.10 并行 3.1.0.1 枚举器 0.4.8 ,带手册here
【问题讨论】:
你的并行度太多了。使用 1 的块大小和 2k 行文件,您打开文件 2k 次。而且并行度越高,您使用的内存也就越多。我真的不认为像这样需要与自身交叉结构的问题适合您选择的并行化策略。您应该设置一个相当大的块大小,并在每个块内进行计算,然后跨块进行计算。 为了进一步强调这一点,您正在执行一个在空间和磁盘读取方面可能是线性的操作,并将其转换为在空间和磁盘读取方面为 n^2 的操作。读取的惰性和严格性只是在用完文件描述符或用完内存以保存所有读取结果之间进行权衡。无论哪种方式,这都是错误的方法。 不应该 pseq 足够聪明,不会立即产生所有 2k 线程吗?我认为他们更像是工作。它们都必须在某个时候完成,并且通过使用 pseq 我试图告诉 haskell 它可以通过并行运行一些来优化运行时间。 ps,我实际上是故意运行 n^2 算法,因为我需要将每个元素与其他元素进行比较。然而,任何时候所需的总空间都可以小得多,因为我不需要将整个元素列表(线性)保存在内存中。我只是遍历整个列表 n 次。因为 n^2 算法总是很慢,所以我尝试通过使其并行来充分利用它。 该算法在时间上必然是 n^2,当然,但您的实现在空间和磁盘读取方面也是 n^2,并且不必如此。您不是在激发线程,而是将表达式的评估激发为弱头范式,这可以在可用的操作系统线程上执行。但是,运行时也可以交错评估这些火花,尤其是当它们阻塞时,例如磁盘 IO。在任何情况下,如果您要求运行时立即触发 2k 次评估,当它接受您的提议时,您不应该感到惊讶。 【参考方案1】:正如错误所说,打开的文件太多。我希望 Haskell 能够按顺序运行大部分程序,但也会有一些“火花”并行运行。然而,正如 sclv 所提到的,Haskell 总是引发评估。
这在纯函数式程序中通常不是问题,但在处理 IO(资源)时会出现问题。我将 Real World Haskell 书中描述的并行度扩展得太远了。所以我的结论是在处理火花内的 IO 资源时只在有限的范围内进行并行处理。在纯函数部分,过度并行可能会成功。
因此,我的帖子的答案是,不在整个程序上使用 MapReduce,而是在内部纯功能部分中使用。
为了显示程序实际失败的地方,我使用--enable-executable-profiling -p 对其进行配置,构建它,然后使用+RTS -p -hc -L30 运行它。因为可执行文件立即失败,所以没有内存分配配置文件。 .prof 文件中生成的时间分配配置文件以以下内容开头:
individual inherited
COST CENTRE MODULE no. entries %time %alloc %time %alloc
MAIN MAIN 1 0 0.0 0.3 100.0 100.0
main Main 1648 2 0.0 0.0 50.0 98.9
sumOfDistancesOnFileWithIt MapReduceTest 1649 1 0.0 0.0 50.0 98.9
chunkedFileEnum MapReduceTest 1650 1 0.0 0.0 50.0 98.9
chunkedEnum MapReduceTest 1651 495 0.0 24.2 50.0 98.9
lineOffsets MapReduceTest 1652 1 50.0 74.6 50.0 74.6
chunkedEnum 返回 IO ([Enumerator Text m b], [Handle]),显然接收到 495 个条目。输入文件是一个 2k 行文件,因此 lineOffsets 上的单个条目返回了 2000 个偏移量的列表。 distancesUsingMapReduceIt 中没有一个条目,所以实际工作甚至没有开始!
【讨论】:
以上是关于为啥我使用 iteratee IO 的 Mapreduce 实现(真实世界的 Haskell)也因“打开的文件太多”而失败的主要内容,如果未能解决你的问题,请参考以下文章
使用Iteratees使用Play Scala将文件直接上传到S3 chunk-by-chunk
添加reactivemongo后出现错误“play-iteratees_2.10 not found”
underscorejs之_.countBy(list, iteratee, [context])