为啥我修改后的(真实世界的haskell)Mapreduce 实现失败并显示“打开的文件太多”

Posted

技术标签:

【中文标题】为啥我修改后的(真实世界的haskell)Mapreduce 实现失败并显示“打开的文件太多”【英文标题】:Why does my modified (real world haskell) Mapreduce implementation fails with "Too many open files"为什么我修改后的(真实世界的haskell)Mapreduce 实现失败并显示“打开的文件太多” 【发布时间】:2011-04-04 16:56:55 【问题描述】:

我正在实现一个 haskell 程序,它将文件的每一行与文件中的每一行进行比较。为简单起见,我们假设一行表示的数据结构只是一个 Int,而我的算法是平方距离。我将按如下方式实现:

--My operation
distance :: Int -> Int -> Int
distance a b = (a-b)*(a-b)

combineDistances :: [Int] -> Int
combineDistances = sum

--Applying my operation simply on a file
sumOfDistancesOnSmallFile :: FilePath -> IO Int
sumOfDistancesOnSmallFile path = do
              fileContents <- readFile path
              return $ allDistances $ map read $ lines $ fileContents
              where
                  allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs)
                  allDistances _ = 0

--Test file generation
createTestFile :: Int -> FilePath -> IO ()
createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1
    where infiniteList :: Int->Int-> [Int]
          infiniteList i j = (i + j) : infiniteList j (i+j)

很遗憾,完整的文件将保存在内存中。为了防止在非常大的文件上可能出现内存不足异常,我想在每次递归“allDistances”时将文件游标返回到文件的开头。

在“Real World Haskell”一书中,给出了 mapreduce 的实现,具有将文件分割成块的功能(第 24 章,可用 here)。我修改了分块功能,而不是将整个文件分成块,而是返回与行一样多的块,每个块代表一个元素

tails . lines. readFile

完整的实现是(加上前面的代码区域)

import qualified Data.ByteString.Lazy.Char8 as Lazy
import Control.Exception (bracket,finally)
import Control.Monad(forM,liftM)
import Control.Parallel.Strategies
import Control.Parallel
import Control.DeepSeq (NFData)
import Data.Int (Int64)
import System.IO

--Applying my operation using mapreduce on a very big file
sumOfDistancesOnFile :: FilePath -> IO Int
sumOfDistancesOnFile path = chunkedFileOperation chunkByLinesTails (distancesUsingMapReduce) path

distancesUsingMapReduce :: [Lazy.ByteString] -> Int
distancesUsingMapReduce = mapReduce rpar (distancesFirstToTail . lexer)
                                rpar combineDistances
              where lexer :: Lazy.ByteString -> [Int]
                    lexer chunk = map (read . Lazy.unpack) (Lazy.lines chunk)

distancesOneToMany :: Int -> [Int] -> Int
distancesOneToMany one many = combineDistances $ map (distance one) many

distancesFirstToTail :: [Int] -> Int
distancesFirstToTail s = 
              if not (null s)
              then distancesOneToMany (head s) (tail s)
              else 0
--The mapreduce algorithm
mapReduce :: Strategy b -- evaluation strategy for mapping
      -> (a -> b)   -- map function
      -> Strategy c -- evaluation strategy for reduction
      -> ([b] -> c) -- reduce function
      -> [a]        -- list to map over
      -> c
mapReduce mapStrat mapFunc reduceStrat reduceFunc input =
      mapResult `pseq` reduceResult
      where mapResult    = parMap mapStrat mapFunc input
            reduceResult = reduceFunc mapResult `using` reduceStrat


--Working with (file)chunks:
data ChunkSpec = CS
    chunkOffset :: !Int64
    , chunkLength :: !Int64
     deriving (Eq,Show)

chunkedFileOperation ::   (NFData a)=>
            (FilePath-> IO [ChunkSpec])
       ->   ([Lazy.ByteString]-> a)
       ->   FilePath
       ->   IO a
chunkedFileOperation chunkCreator funcOnChunks path = do
    (chunks, handles)<- chunkedRead chunkCreator path
    let r = funcOnChunks chunks
    (rdeepseq r `seq` return r) `finally` mapM_ hClose handles

chunkedRead ::  (FilePath -> IO [ChunkSpec])
        ->  FilePath
        ->  IO ([Lazy.ByteString], [Handle])
chunkedRead chunkCreator path = do
    chunks <- chunkCreator path
    liftM unzip . forM chunks $ \spec -> do
    h <- openFile path ReadMode
    hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec))
    chunk <- Lazy.take (chunkLength spec) `liftM` Lazy.hGetContents h
    return (chunk,h)

-- returns set of chunks representing  tails . lines . readFile 
chunkByLinesTails :: FilePath -> IO[ChunkSpec]
chunkByLinesTails path = do
    bracket (openFile path ReadMode) hClose $ \h-> do
        totalSize <- fromIntegral `liftM` hFileSize h
        let chunkSize = 1
            findChunks offset = do
            let newOffset = offset + chunkSize
            hSeek h AbsoluteSeek (fromIntegral newOffset)
            let findNewline lineSeekOffset = do
                eof <- hIsEOF h
                if eof
                    then return [CS offset (totalSize - offset)]
                    else do
                        bytes <- Lazy.hGet h 4096
                        case Lazy.elemIndex '\n' bytes of
                            Just n -> do
                                nextChunks <- findChunks (lineSeekOffset + n + 1)
                                return (CS offset (totalSize-offset):nextChunks)
                            Nothing -> findNewline (lineSeekOffset + Lazy.length bytes)
            findNewline newOffset
        findChunks 0

不幸的是,在更大的文件(例如 2000 行)上,mapreduce 版本会引发异常:* 异常:getCurrentDirectory:资源耗尽(打开的文件过多)

我有点惭愧不能自己调试程序,但我只知道如何调试java/c#代码。而且我也不知道如何正确测试文件分块和读取。我希望问题不是 mapreduce 函数本身的一部分,因为没有 mapreduce 的类似版本也会引发异常。在那次尝试中,我让 chunkedFileOperation 接受一个块的操作和它直接应用的“reduce”函数。

顺便说一句,我正在跑步 Mac OS X 10.6.7(雪豹)上的 HaskellPlatform 2011.2.0 带有以下软件包: 字节串 0.9.1.10 并行 3.1.0.1 我有资格成为一名自学成才的初学者/新的 Haskell 程序员

【问题讨论】:

堆栈溢出时,您不应发布签名。您的帐户名称显示在问题正下方的框中。 使用以下导入:将合格的 Data.ByteString.Char8 导入为 Lazy 和以下块:data ChunkSpec = CS chunkOffset :: !Int , chunkLength :: !Int 派生 (Eq,Show)该程序适用于大文件。 我最初尝试使用 iteratee IO 不幸失败了,我在this question寻求帮助 【参考方案1】:

您正在使用惰性 IO,因此使用 readFile 打开的文件不会及时关闭。您需要考虑一种定期明确关闭文件的解决方案(例如,通过严格 IO 或迭代 IO)。

【讨论】:

啊,所以我正在应用两种降低内存的解决方案 但是是惰性 IO 导致 mapReduce 的 pseq 产生了太多线程,还是延迟了 chunkedFileOperation 中的 finally - hClose。本书示例的这一部分对我来说并不是很清楚,因为我阅读它是为了最终“释放所有句柄”而不是释放它刚刚 rdeepseq 进入的每个句柄。 不,没有太多线程。打开的文件太多(因为它们被懒惰地关闭,当 GC 决定不需要它们时)。【参考方案2】:

这个错误的意思正是它所说的:您的进程打开了太多文件。操作系统对进程可以同时读取的文件(或目录)的数量施加了任意限制。请参阅您的 ulimit(1) 联机帮助页和/或限制映射器的数量。

【讨论】:

我知道文件句柄的数量是有限制的,但我希望我的算法最多使用 ghc 创建的线程数,这应该非常低(至少,这是本意实施)

以上是关于为啥我修改后的(真实世界的haskell)Mapreduce 实现失败并显示“打开的文件太多”的主要内容,如果未能解决你的问题,请参考以下文章

为啥这个 Haskell 代码可以成功地处理无限列表?

为啥我的haskell程序这么慢? Haskell 编程,人生游戏

为啥我的 Haskell 函数参数必须是 Bool 类型?

为什么会出现解析错误,如何在haskell中解决此错误?

为啥这个 Haskell 程序表现如此糟糕?

将“为啥函数式编程很重要”翻译成 Haskell