无法将大量数据写入流

Posted

技术标签:

【中文标题】无法将大量数据写入流【英文标题】:Failed to write large amount of data to stream 【发布时间】:2017-02-11 13:50:41 【问题描述】:

当我尝试使用CsvHelper 将大量数据(包含 300 000 行及更多行的列表)写入内存流时,它会抛出异常 "System.IO.IOException: Stream was too long 。”

数据类相当大,有大约 30 个属性,因此文件中的每条记录将有大约 30 列。

这是引发异常的实际编写代码(顺便说一下,此代码基于 CsvHelper lib 的作者that 的回答):

using (var memoryStream = new MemoryStream())

    using (var streamWriter = new StreamWriter(memoryStream, encoding ?? Encoding.ASCII))
    
        var csvWriter = new CsvWriter(streamWriter, GetConfiguration(delimiter, mappingClassType, mappingActions));
        csvWriter.WriteRecords(data); //data is IEnumerable<T> and has more than 300k records

        streamWriter.Flush();
        return memoryStream.ToArray();
    

然后我将结果字节数组保存到文件中。

File.WriteAllBytes(filePath, resultedBytesArray); 

请注意,当我将 100 000 条记录写入文件时,相同的代码运行良好(在这种情况下,文件的大小约为 1GB)。顺便说一句,我的目标是写入超过 600 000 条数据记录。

这是与此问题相关的堆栈跟踪的相关部分。

Stream was too long.|System.IO.IOException: Stream was too long.
at System.IO.MemoryStream.Write(Byte[] buffer, Int32 offset, Int32 count) 
at System.IO.StreamWriter.Flush(Boolean flushStream, Boolean flushEncoder) 
at System.IO.StreamWriter.Write(Char[] buffer, Int32 index, Int32 count) 
at CsvHelper.CsvWriter.NextRecord() in C:\Users\Josh\Projects\CsvHelper\src\CsvHelper\CsvWriter.cs:line 290 
at CsvHelper.CsvWriter.WriteRecords(IEnumerable records) in C:\Users\Josh\Projects\CsvHelper\src\CsvHelper\CsvWriter.cs:line 490 
at FileExport.Csv.CsvDocument.Create[T](IEnumerable`1 data, String delimiter, Encoding encoding, Type mappingClassType, IDictionary`2 mappingActions) in d:\Dev\DrugDevExport\FileExport\Csv\CsvDocument.cs:line 33 

就我而言,实现我的目标并避免该问题的基本方法是将我的书面数据列表分成几个部分,然后将它们连接在一起,但可能有任何非常明显和简单的解决方案没有重要的代码重构(如增加默认流/缓冲区大小等)?

另外请记住,我还应用了两种可能的解决方案,以防止“内存不足”对象异常。

摆脱了对象的 2GB 限制(来自这里 https://***.com/a/20912869) 是的,我在具有 32GB RAM 的 x64 操作系统上运行。 在构建设置部分设置 x64“平台目标”(来自此处https://***.com/a/22592876)

提前致谢。

【问题讨论】:

为什么要写入 MemoryStream?您是否需要将流完全放在内存中?您谈论文件,但使用 MemoryStream... 用 FileStream 替换它,看看会发生什么... 您是否尝试过读取有限数量的数据并将其循环写入流?即不是一次全部。您也许可以尝试与此帖子 ***.com/questions/2819081/… 类似的分块方法 @PaulZahra,我在我的问题中提到了这一点,这种方式(通过拆分整组数据)很可能会起作用,现在它适用于 100k 数据记录,但是否存在没有分裂的任何其他解决方案? @ArtyomPranovich 我认为分块更合乎逻辑/安全/未来证明,否则您过于依赖机器......您可以尝试定义自己的缓冲区(给它一个大小)但是您很可能会遇到内存必须连续的问题)ps根据“那个”帖子:当你离开它的 using 语句时,streamWriter 将被自动刷新,这很好,因为你在 using 内部返回(所以删除你的刷新) 我同意@spender - 你似乎无缘无故地跳过了很多圈。您将列表写入一个流,然后将整个流读入一个数组,然后将该数组写入第二个流。只需将其直接写入第二个流即可。实际上,您正在为内存中的相同数据创建三种不同的表示形式(列表、MemoryStream 的底层存储以及不仅仅是对 MemoryStream 缓冲区的引用的 byte[])。我认为非常明显的解决方案是不在内存中存储三次大数据。 【参考方案1】:

非常感谢Spender,就像他在问题下方的评论中提到的那样,已通过将 MemoryStream 替换为 FileStream 并将数据直接写入文件来解决此问题。

在我的情况下,将数据写入 MemoryStream 然后无缘无故地再次将其复制到文件中是绝对没用的。再次感谢 him 让我看到这个事实。

下面是我的固定代码。

using (var fileStream = File.Create(path))

    using (var streamWriter = new StreamWriter(fileStream, encoding ?? Encoding.ASCII))
    
        var csvWriter = new CsvWriter(streamWriter, GetConfiguration(delimiter, mappingClassType, mappingActions));
        csvWriter.WriteRecords(data);
    

现在它可以处理任意数量的输入数据。

【讨论】:

【参考方案2】:

您可以通过编写自己的 MemoryStream 来解决 2GB 的限制:

    class HugeMemoryStream : Stream
    
        #region Fields

        private const int PAGE_SIZE = 1024000;
        private const int ALLOC_STEP = 1024;

        private byte[][] _streamBuffers;

        private int _pageCount = 0;
        private long _allocatedBytes = 0;

        private long _position = 0;
        private long _length = 0;

        #endregion Fields

        #region Internals

        private int GetPageCount(long length)
        
            int pageCount = (int)(length / PAGE_SIZE) + 1;

            if ((length % PAGE_SIZE) == 0)
                pageCount--;

            return pageCount;
        

        private void ExtendPages()
        
            if (_streamBuffers == null)
            
                _streamBuffers = new byte[ALLOC_STEP][];
            
            else
            
                byte[][] streamBuffers = new byte[_streamBuffers.Length + ALLOC_STEP][];

                Array.Copy(_streamBuffers, streamBuffers, _streamBuffers.Length);

                _streamBuffers = streamBuffers;
            

            _pageCount = _streamBuffers.Length;
        

        private void AllocSpaceIfNeeded(long value)
        
            if (value < 0)
                throw new InvalidOperationException("AllocSpaceIfNeeded < 0");

            if (value == 0)
                return;

            int currentPageCount = GetPageCount(_allocatedBytes);
            int neededPageCount = GetPageCount(value);

            while (currentPageCount < neededPageCount)
            
                if (currentPageCount == _pageCount)
                    ExtendPages();

                _streamBuffers[currentPageCount++] = new byte[PAGE_SIZE];
            

            _allocatedBytes = (long)currentPageCount * PAGE_SIZE;

            value = Math.Max(value, _length);

            if (_position > (_length = value))
                _position = _length;
        

        #endregion Internals

        #region Stream

        public override bool CanRead => true;

        public override bool CanSeek => true;

        public override bool CanWrite => true;

        public override long Length => _length;

        public override long Position
        
            get  return _position; 
            set
            
                if (value > _length)
                    throw new InvalidOperationException("Position > Length");
                else if (value < 0)
                    throw new InvalidOperationException("Position < 0");
                else
                    _position = value;
            
        

        public override void Flush()  

        public override int Read(byte[] buffer, int offset, int count)
        
            int currentPage = (int)(_position / PAGE_SIZE);
            int currentOffset = (int)(_position % PAGE_SIZE);
            int currentLength = PAGE_SIZE - currentOffset;

            long startPosition = _position;

            if (startPosition + count > _length)
                count = (int)(_length - startPosition);

            while (count != 0 && _position < _length)
            
                if (currentLength > count)
                    currentLength = count;

                Array.Copy(_streamBuffers[currentPage++], currentOffset, buffer, offset, currentLength);

                offset += currentLength;
                _position += currentLength;
                count -= currentLength;

                currentOffset = 0;
                currentLength = PAGE_SIZE;
            

            return (int)(_position - startPosition);
        

        public override long Seek(long offset, SeekOrigin origin)
        
            switch (origin)
            
                case SeekOrigin.Begin:
                    break;

                case SeekOrigin.Current:
                    offset += _position;
                    break;

                case SeekOrigin.End:
                    offset = _length - offset;
                    break;

                default:
                    throw new ArgumentOutOfRangeException("origin");
            

            return Position = offset;
        

        public override void SetLength(long value)
        
            if (value < 0)
                throw new InvalidOperationException("SetLength < 0");

            if (value == 0)
            
                _streamBuffers = null;
                _allocatedBytes = _position = _length = 0;
                _pageCount = 0;
                return;
            

            int currentPageCount = GetPageCount(_allocatedBytes);
            int neededPageCount = GetPageCount(value);

            // Removes unused buffers if decreasing stream length
            while (currentPageCount > neededPageCount)
                _streamBuffers[--currentPageCount] = null;

            AllocSpaceIfNeeded(value);

            if (_position > (_length = value))
                _position = _length;
        

        public override void Write(byte[] buffer, int offset, int count)
        
            int currentPage = (int)(_position / PAGE_SIZE);
            int currentOffset = (int)(_position % PAGE_SIZE);
            int currentLength = PAGE_SIZE - currentOffset;

            long startPosition = _position;

            AllocSpaceIfNeeded(_position + count);

            while (count != 0)
            
                if (currentLength > count)
                    currentLength = count;

                Array.Copy(buffer, offset, _streamBuffers[currentPage++], currentOffset, currentLength);

                offset += currentLength;
                _position += currentLength;
                count -= currentLength;

                currentOffset = 0;
                currentLength = PAGE_SIZE;
            
        

        #endregion Stream
    
using ICSharpCode.SharpZipLib.GZip;
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

            // HugeMemoryStrem Test

            string filename = @"gzip-filename.gz";

            HugeMemoryStream ms = new HugeMemoryStream();

            using (StreamWriter sw = new StreamWriter(ms, Encoding.UTF8, 16384, true))
            using (FileStream fs = new FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read))
            using (GZipInputStream gzipStream = new GZipInputStream(fs))
            using (StreamReader sr = new StreamReader(gzipStream, Encoding.UTF8, false, 16384, true))
            
                for (string line = sr.ReadLine(); line != null; line = sr.ReadLine())
                    sw.WriteLine(line);
            

            ms.Seek(0, SeekOrigin.Begin);

            using (StreamReader srm = new StreamReader(ms, Encoding.UTF8, false, 16384, true))
            using (FileStream fs = new FileStream(filename, FileMode.Open, FileAccess.Read, FileShare.Read))
            using (GZipInputStream gzipStream = new GZipInputStream(fs))
            using (StreamReader sr = new StreamReader(gzipStream, Encoding.UTF8, false, 16384, true))
            
                for (string line1 = sr.ReadLine(), line2 = srm.ReadLine(); line1 != null; line1 = sr.ReadLine(), line2 = srm.ReadLine())
                
                    if (line1 != line2)
                        throw new InvalidDataException();
                
            

【讨论】:

以上是关于无法将大量数据写入流的主要内容,如果未能解决你的问题,请参考以下文章

如何高效地向Redis写入大量的数据

加快将大量 JSON 数据写入磁盘的速度

如何高效地向Redis写入大量的数据

将大量数据写入 excel:超出 GC 开销限制

NoSQL数据库技术实战-第1章 NoSQL与大数据简介 NoSQL产生的原因

MySQL数据库 写入大量数据如何实现