Haskell 中字节流的高效流式传输和操作
Posted
技术标签:
【中文标题】Haskell 中字节流的高效流式传输和操作【英文标题】:Efficient streaming and manipulation of a byte stream in Haskell 【发布时间】:2016-10-08 13:05:19 【问题描述】:在为大型(<bloblength><blob>)*
编码二进制文件编写反序列化器时,我被各种 Haskell 生产-转换-消费库困住了。到目前为止,我知道有四个流媒体库:
conduit
(Haskell Cast #6很好地揭示了conduit
和pipes
之间的区别)
Data.Binary.Get:提供了getWord32be等有用的函数,但是流式示例很尴尬
System.IO.Streams: 好像是最容易用的一个
这是一个简化的示例,说明当我尝试使用conduit
进行Word32
流式传输时出现问题。一个稍微现实一点的例子是首先读取一个确定 blob 长度的Word32
,然后产生一个该长度的惰性ByteString
(然后进一步反序列化)。
但在这里我只是尝试从二进制文件中以流方式提取 Word32:
module Main where
-- build-depends: bytestring, conduit, conduit-extra, resourcet, binary
import Control.Monad.Trans.Resource (MonadResource, runResourceT)
import qualified Data.Binary.Get as G
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as BL
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
import Data.Word (Word32)
import System.Environment (getArgs)
-- gets a Word32 from a ByteString.
getWord32 :: C.ByteString -> Word32
getWord32 bs = do
G.runGet G.getWord32be $ BL.fromStrict bs
-- should read BytesString and return Word32
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = do
mbs <- await
case mbs of
Just bs -> do
case C.null bs of
False -> do
yield $ getWord32 bs
leftover $ BS.drop 4 bs
transform
True -> return ()
Nothing -> return ()
main :: IO ()
main = do
filename <- fmap (!!0) getArgs -- should check length getArgs
result <- runResourceT $ (CB.sourceFile filename) $$ transform =$ CL.consume
print $ length result -- is always 8188 for files larger than 32752 bytes
程序的输出只是读取的 Word32 的数量。事实证明,流在读取第一个块(大约 32KiB)后终止。出于某种原因,mbs
永远不是Nothing
,所以我必须检查null bs
,它会在块被消耗时停止流。显然,我的导管transform
有问题。我看到了两种解决方案:
await
不想去ByteStream
的第二个块,那么还有另一个函数可以拉下一个块吗?在我见过的例子中(例如Conduit 101),这不是它的做法
这只是设置transform
的错误方式。
这是如何正确完成的?这是正确的方法吗? (性能确实很重要。)
更新:这是使用Systems.IO.Streams
的BAD方法:
module Main where
import Data.Word (Word32)
import System.Environment (getArgs)
import System.IO (IOMode (ReadMode), openFile)
import qualified System.IO.Streams as S
import System.IO.Streams.Binary (binaryInputStream)
import System.IO.Streams.List (outputToList)
main :: IO ()
main = do
filename : _ <- getArgs
h <- openFile filename ReadMode
s <- S.handleToInputStream h
i <- binaryInputStream s :: IO (S.InputStream Word32)
r <- outputToList $ S.connect i
print $ last r
'Bad'表示:对时间和空间要求很高,不处理Decode异常。
【问题讨论】:
演示程序是否应该将整个输入分成 4 字节块并产生 word32s? 是的。更广泛的目标是读取 Word32 和可变大小的 blob(惰性字节字符串)。 不相关,但对于简单的 arg 解析,我会写filename : _ <- getArgs
而不是 filename <- fmap (!!0) getArgs
。
@danidiaz 确实如此。或者head <$> getArgs
【参考方案1】:
您的直接问题是由您使用leftover
的方式引起的。该函数用于“提供单个剩余输入以供当前单子绑定中的下一个组件使用”,因此当您在使用 transform
循环之前给它 bs
时,您实际上是在扔掉其余的字节串(即bs
之后的内容)。
基于您的代码的正确解决方案将使用 the incremental input interface 或 Data.Binary.Get
将您的 yield
/leftover
组合替换为完全消耗每个块的东西。不过,更实用的方法是使用 binary-conduit 包,它以conduitGet
的形式提供(它的source 很好地了解了“手动”实现的外观喜欢):
import Data.Conduit.Serialization.Binary
-- etc.
transform :: (Monad m, MonadResource m) => Conduit BS.ByteString m Word32
transform = conduitGet G.getWord32be
需要注意的是,如果字节总数不是 4 的倍数(即最后一个 Word32
不完整),这将引发解析错误。在不太可能的情况下,这不是您想要的,一个懒惰的出路是简单地在输入字节串上使用\bs -> C.take (4 * truncate (C.length bs / 4)) bs
。
【讨论】:
哦,我明白了,“......在当前的单子绑定中”,这解释了它。谢谢你。我按照您的建议插入了transform
,它可以工作,但它会消耗大量内存(35MB 文件约 500MB 内存)。使用Data.Conduit.List
时似乎失去了懒惰。
@mcmayer 差不多了。经验法则是,当使用流媒体库时,您不应该在列表中收集输出(就像您在编辑中添加的错误 io-streams 示例中的 outputToList
一样),因为您真的如果有很多输出,不希望发生这种情况。相反,您应该使用适当的流式消费者(或“接收器”,在 conduit 术语中)。 Michael 的回答显示了使用 pipes 的要点(在他的演示中,消费者是 P.print
); conduit 解决方案是类似的。【参考方案2】:
使用pipes
(以及pipes-group
和pipes-bytestring
),演示问题简化为组合子。首先,我们将传入的未区分字节流解析为 4 字节的小块:
chunksOfStrict :: (Monad m) => Int -> Producer ByteString m r -> Producer ByteString m r
chunksOfStrict n = folds mappend mempty id . view (Bytes.chunksOf n)
然后我们将这些映射到Word32
s 并(在这里)计算它们。
main :: IO ()
main = do
filename:_ <- getArgs
IO.withFile filename IO.ReadMode $ \h -> do
n <- P.length $ chunksOfStrict 4 (Bytes.fromHandle h) >-> P.map getWord32
print n
如果我们的字节数少于 4 或无法解析,这将失败,但我们也可以使用
进行映射getMaybeWord32 :: ByteString -> Maybe Word32
getMaybeWord32 bs = case G.runGetOrFail G.getWord32be $ BL.fromStrict bs of
Left r -> Nothing
Right (_, off, w32) -> Just w32
下面的程序将打印出有效的 4 字节序列的解析
main :: IO ()
main = do
filename:_ <- getArgs
IO.withFile filename IO.ReadMode $ \h -> do
runEffect $ chunksOfStrict 4 (Bytes.fromHandle h)
>-> P.map getMaybeWord32
>-> P.concat -- here `concat` eliminates maybes
>-> P.print
当然,还有其他处理失败解析的方法。
不过,这里更接近您要求的程序。它从字节流 (Producer ByteString m r
) 中获取一个四字节段,如果它足够长,则将其读取为 Word32
;然后它需要 那么多 的传入字节并将它们累积成一个惰性字节串,从而产生它。它只是重复这个直到它用完字节。在下面的main
中,我打印了每个产生的惰性字节串:
module Main (main) where
import Pipes
import qualified Pipes.Prelude as P
import Pipes.Group (folds)
import qualified Pipes.ByteString as Bytes ( splitAt, fromHandle, chunksOf )
import Control.Lens ( view ) -- or Lens.Simple (view) -- or Lens.Micro ((.^))
import qualified System.IO as IO ( IOMode(ReadMode), withFile )
import qualified Data.Binary.Get as G ( runGet, getWord32be )
import Data.ByteString ( ByteString )
import qualified Data.ByteString.Lazy.Char8 as BL
import System.Environment ( getArgs )
splitLazy :: (Monad m, Integral n) =>
n -> Producer ByteString m r -> m (BL.ByteString, Producer ByteString m r)
splitLazy n bs = do
(bss, rest) <- P.toListM' $ view (Bytes.splitAt n) bs
return (BL.fromChunks bss, rest)
measureChunks :: Monad m => Producer ByteString m r -> Producer BL.ByteString m r
measureChunks bs = do
(lbs, rest) <- lift $ splitLazy 4 bs
if BL.length lbs /= 4
then rest >-> P.drain -- in fact it will be empty
else do
let w32 = G.runGet G.getWord32be lbs
(lbs', rest') <- lift $ splitLazy w32 bs
yield lbs
measureChunks rest
main :: IO ()
main = do
filename:_ <- getArgs
IO.withFile filename IO.ReadMode $ \h -> do
runEffect $ measureChunks (Bytes.fromHandle h) >-> P.print
这又是粗略的,因为它使用runGet
而不是runGetOrFail
,但这很容易修复。管道标准过程是在解析失败时停止流转换并返回未解析的字节流。
如果您预计Word32s
用于大数字,因此您不想将相应的字节流累积为惰性字节串,而是说将它们写入不同的文件而不累积,我们可以更改程序很容易做到这一点。这需要复杂地使用管道,但这是pipes
和streaming
的首选方法。
【讨论】:
【参考方案3】:这是一个相对简单的解决方案,我想把它扔进擂台。这是对splitAt
的重复使用,它被包装到State
monad 中,它提供了与Data.Binary.Get
(的子集)相同的接口。生成的[ByteString]
在main
中获得,whileJust
超过getBlob
。
module Main (main) where
import Control.Monad.Loops
import Control.Monad.State
import qualified Data.Binary.Get as G (getWord32be, runGet)
import qualified Data.ByteString.Lazy as BL
import Data.Int (Int64)
import Data.Word (Word32)
import System.Environment (getArgs)
-- this is going to mimic the Data.Binary.Get.Get Monad
type Get = State BL.ByteString
getWord32be :: Get (Maybe Word32)
getWord32be = state $ \bs -> do
let (w, rest) = BL.splitAt 4 bs
case BL.length w of
4 -> (Just w', rest) where
w' = G.runGet G.getWord32be w
_ -> (Nothing, BL.empty)
getLazyByteString :: Int64 -> Get BL.ByteString
getLazyByteString n = state $ \bs -> BL.splitAt n bs
getBlob :: Get (Maybe BL.ByteString)
getBlob = do
ml <- getWord32be
case ml of
Nothing -> return Nothing
Just l -> do
blob <- getLazyByteString (fromIntegral l :: Int64)
return $ Just blob
runGet :: Get a -> BL.ByteString -> a
runGet g bs = fst $ runState g bs
main :: IO ()
main = do
fname <- head <$> getArgs
bs <- BL.readFile fname
let ls = runGet loop bs where
loop = whileJust getBlob return
print $ length ls
getBlob
中没有错误处理,但很容易扩展。时间和空间复杂度相当不错,只要谨慎使用结果列表即可。 (上面创建一些随机数据供上面消费的python脚本是here)。
【讨论】:
对,这在概念上非常接近于您使用pipes-parse
执行的操作,后者使用 StateT (Producer ByteString m x) m r
逐位消耗字节流 (= Producer ByteString m r
)。如果你稍微改变一下,你最终可能会再次累积惰性字节串列表。
还请注意,您正在重写unfoldr
,它采用的参数接近您隐藏在State
、(s -> Maybe (a, s)) -> s -> [a])
中的参数。 unfoldr
对应于 whileMaybe
和 State
的组合。这是一个明确使用Pipes.Prelude.unfoldr
的管道等价物(它使用 Either 而不是 Maybe 来保持最终返回值) sprunge.us/GcjM 您可以看到 step 函数的功能与您正在编写的功能相匹配,以各种管道功能为模。
这是展开器方法的一种变体,它使用streaming
+ streaming-bytestring
库,它只是隔离了管道的这个非管道方面并使其更易于使用。 sprunge.us/QKCQ streaming-bytestring
应该尽可能接近惰性字节串,同时正确流式传输,内部块之间有 IO。改变这一点很容易,这样段就不会作为惰性字节串累积,而是可以按块单独写入单独的文件或以其他方式发送。
真正让我感到困惑的是——对于所有方法——为什么它不能在磁盘 I/O 的限制附近执行。磁盘是瓶颈,计算本身相当琐碎。到目前为止,我的解释是 GC 在大量 RAM 周围进行洗牌......(分析器这么说。)但我觉得这个结论很奇怪。在 GPU 之外,我从来没有将 RAM 速度作为瓶颈。
你传递给 ghc 的参数是什么?您正在阅读的文件是什么?以上是关于Haskell 中字节流的高效流式传输和操作的主要内容,如果未能解决你的问题,请参考以下文章