为啥我修改后的(真实世界的haskell)Mapreduce 实现失败并显示“打开的文件太多”
Posted
技术标签:
【中文标题】为啥我修改后的(真实世界的haskell)Mapreduce 实现失败并显示“打开的文件太多”【英文标题】:Why does my modified (real world haskell) Mapreduce implementation fails with "Too many open files"为什么我修改后的(真实世界的haskell)Mapreduce 实现失败并显示“打开的文件太多” 【发布时间】:2011-04-04 16:56:55 【问题描述】:我正在实现一个 haskell 程序,它将文件的每一行与文件中的每一行进行比较。为简单起见,我们假设一行表示的数据结构只是一个 Int,而我的算法是平方距离。我将按如下方式实现:
--My operation
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)
combineDistances :: [Int] -> Int
combineDistances = sum
--Applying my operation simply on a file
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
fileContents <- readFile path
return $ allDistances $ map read $ lines $ fileContents
where
allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
allDistances _ = 0
--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
where infiniteList :: Int->Int-> [Int]
infiniteList i j = (i + j) : infiniteList j (i+j)
很遗憾,完整的文件将保存在内存中。为了防止在非常大的文件上可能出现内存不足异常,我想在每次递归“allDistances”时将文件游标返回到文件的开头。
在“Real World Haskell”一书中,给出了 mapreduce 的实现,具有将文件分割成块的功能(第 24 章,可用 here)。我修改了分块功能,而不是将整个文件分成块,而是返回与行一样多的块,每个块代表一个元素
tails . lines. readFile
完整的实现是(加上前面的代码区域)
import qualified Data.ByteString.Lazy.Char8 as Lazy
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
import System.IO
--Applying my operation using mapreduce on a very big file
sumOfDistancesOnFile :: FilePath -> IO Int
sumOfDistancesOnFile path = chunkedFileOperation chunkByLinesTails (distancesUsingMapReduce) path
distancesUsingMapReduce :: [Lazy.ByteString] -> Int
distancesUsingMapReduce = mapReduce rpar (distancesFirstToTail . lexer)
rpar combineDistances
where lexer :: Lazy.ByteString -> [Int]
lexer chunk = map (read . Lazy.unpack) (Lazy.lines chunk)
distancesOneToMany :: Int -> [Int] -> Int
distancesOneToMany one many = combineDistances $ map (distance one) many
distancesFirstToTail :: [Int] -> Int
distancesFirstToTail s =
if not (null s)
then distancesOneToMany (head s) (tail s)
else 0
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
-> (a -> b) -- map function
-> Strategy c -- evaluation strategy for reduction
-> ([b] -> c) -- reduce function
-> [a] -- list to map over
-> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
mapResult `pseq` reduceResult
where mapResult = parMap mapStrat mapFunc input
reduceResult = reduceFunc mapResult `using` reduceStrat
--Working with (file)chunks:
data ChunkSpec = CS
chunkOffset :: !Int64
, chunkLength :: !Int64
deriving (Eq,Show)
chunkedFileOperation :: (NFData a)=>
(FilePath-> IO [ChunkSpec])
-> ([Lazy.ByteString]-> a)
-> FilePath
-> IO a
chunkedFileOperation chunkCreator funcOnChunks path = do
(chunks, handles)<- chunkedRead chunkCreator path
let r = funcOnChunks chunks
(rdeepseq r `seq` return r) `finally` mapM_ hClose handles
chunkedRead :: (FilePath -> IO [ChunkSpec])
-> FilePath
-> IO ([Lazy.ByteString], [Handle])
chunkedRead chunkCreator path = do
chunks <- chunkCreator path
liftM unzip . forM chunks $ \spec -> do
h <- openFile path ReadMode
hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
chunk <- Lazy.take (chunkLength spec) `liftM` Lazy.hGetContents h
return (chunk,h)
-- returns set of chunks representing tails . lines . readFile
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
bracket (openFile path ReadMode) hClose $ \h-> do
totalSize <- fromIntegral `liftM` hFileSize h
let chunkSize = 1
findChunks offset = do
let newOffset = offset + chunkSize
hSeek h AbsoluteSeek (fromIntegral newOffset)
let findNewline lineSeekOffset = do
eof <- hIsEOF h
if eof
then return [CS offset (totalSize - offset)]
else do
bytes <- Lazy.hGet h 4096
case Lazy.elemIndex '\n' bytes of
Just n -> do
nextChunks <- findChunks (lineSeekOffset + n + 1)
return (CS offset (totalSize-offset):nextChunks)
Nothing -> findNewline (lineSeekOffset + Lazy.length bytes)
findNewline newOffset
findChunks 0
不幸的是,在更大的文件(例如 2000 行)上,mapreduce 版本会引发异常:* 异常:getCurrentDirectory:资源耗尽(打开的文件过多)
我有点惭愧不能自己调试程序,但我只知道如何调试java/c#代码。而且我也不知道如何正确测试文件分块和读取。我希望问题不是 mapreduce 函数本身的一部分,因为没有 mapreduce 的类似版本也会引发异常。在那次尝试中,我让 chunkedFileOperation 接受一个块的操作和它直接应用的“reduce”函数。
顺便说一句,我正在跑步 Mac OS X 10.6.7(雪豹)上的 HaskellPlatform 2011.2.0 带有以下软件包: 字节串 0.9.1.10 并行 3.1.0.1 我有资格成为一名自学成才的初学者/新的 Haskell 程序员
【问题讨论】:
堆栈溢出时,您不应发布签名。您的帐户名称显示在问题正下方的框中。 使用以下导入:将合格的 Data.ByteString.Char8 导入为 Lazy 和以下块:data ChunkSpec = CS chunkOffset :: !Int , chunkLength :: !Int 派生 (Eq,Show)该程序适用于大文件。 我最初尝试使用 iteratee IO 不幸失败了,我在this question寻求帮助 【参考方案1】:您正在使用惰性 IO,因此使用 readFile
打开的文件不会及时关闭。您需要考虑一种定期明确关闭文件的解决方案(例如,通过严格 IO 或迭代 IO)。
【讨论】:
啊,所以我正在应用两种降低内存的解决方案 但是是惰性 IO 导致 mapReduce 的 pseq 产生了太多线程,还是延迟了 chunkedFileOperation 中的 finally - hClose。本书示例的这一部分对我来说并不是很清楚,因为我阅读它是为了最终“释放所有句柄”而不是释放它刚刚 rdeepseq 进入的每个句柄。 不,没有太多线程。打开的文件太多(因为它们被懒惰地关闭,当 GC 决定不需要它们时)。【参考方案2】:这个错误的意思正是它所说的:您的进程打开了太多文件。操作系统对进程可以同时读取的文件(或目录)的数量施加了任意限制。请参阅您的 ulimit(1)
联机帮助页和/或限制映射器的数量。
【讨论】:
我知道文件句柄的数量是有限制的,但我希望我的算法最多使用 ghc 创建的线程数,这应该非常低(至少,这是本意实施)以上是关于为啥我修改后的(真实世界的haskell)Mapreduce 实现失败并显示“打开的文件太多”的主要内容,如果未能解决你的问题,请参考以下文章