IEnumerable 流式传输

Posted

技术标签:

【中文标题】IEnumerable 流式传输【英文标题】:IEnumerable to Stream 【发布时间】:2014-03-29 15:58:18 【问题描述】:

我想做一些大致相当于下面代码示例的事情。我想生成和提供数据流,而不必在任何时候都将整个数据集放在内存中。

似乎我需要一些 Stream 的实现,它在其构造函数中接受 IEnumerable<string>(或 IEnumerable<byte>)。在内部,此 Stream 只会在读取 Stream 时或根据需要遍历 IEnumerable。但我不知道有任何这样的 Stream 实现。

我在正确的轨道上吗?你知道有什么方法可以做这样的事情吗?

    public FileStreamResult GetResult()
    
        IEnumerable<string> data = GetDataForStream();

        Stream dataStream = ToStringStream(Encoding.UTF8, data);

        return File(dataStream, "text/plain", "Result");
    

    private IEnumerable<string> GetDataForStream()
    
        StringBuilder sb;
        for (int i = 0; i < 10000; i++)
        
            yield return i.ToString();
            yield return "\r\n";
        
    

    private Stream ToStringStream(Encoding encoding, IEnumerable<string> data)
    
        // I have to write my own implementation of stream?
        throw new NotImplementedException();
    

【问题讨论】:

你能保证流在单遍中被顺序读取吗? @user629926 如果不支持查找,可以将CanSeek设置为false,如果调用Seek,则直接抛出。 @user629926 - 我想我可以保证。我的意图是将流提供给 MVC FileStreamResult,如我的示例所示。 进一步研究,这可能只是个坏主意。如果我不实现 Stream Length 属性,这可以工作吗? @Joel 这完全取决于读者是否依赖它。 【参考方案1】:

我创建了一个名为ProducerConsumerStream 的类来执行此操作。生产者将数据写入流,消费者读取。中间有一个缓冲区,以便生产者可以“提前写”一点。您可以定义缓冲区的大小。

无论如何,如果它不是完全您正在寻找的东西,我怀疑它会让您很好地了解它是如何完成的。见Building a new type of stream。

更新

链接失效了,所以我在这里复制了我的代码。原文章仍可在 Wayback 机器上找到,地址为https://web.archive.org/web/20151210235510/http://www.informit.com/guides/content.aspx?g=dotnet&seqNum=852

首先,ProducerConsumerStream 类:

using System;
using System.IO;
using System.Threading;
using System.Diagnostics;

namespace Mischel.IO

    // This class is safe for 1 producer and 1 consumer.
    public class ProducerConsumerStream : Stream
    
        private byte[] CircleBuff;
        private int Head;
        private int Tail;

        public bool IsAddingCompleted  get; private set; 
        public bool IsCompleted  get; private set; 

        // For debugging
        private long TotalBytesRead = 0;
        private long TotalBytesWritten = 0;

        public ProducerConsumerStream(int size)
        
            CircleBuff = new byte[size];
            Head = 1;
            Tail = 0;
        

        [Conditional("JIM_DEBUG")]
        private void DebugOut(string msg)
        
            Console.WriteLine(msg);
        

        [Conditional("JIM_DEBUG")]
        private void DebugOut(string fmt, params object[] parms)
        
            DebugOut(string.Format(fmt, parms));
        

        private int ReadBytesAvailable
        
            get
            
                if (Head > Tail)
                    return Head - Tail - 1;
                else
                    return CircleBuff.Length - Tail + Head - 1;
            
        

        private int WriteBytesAvailable  get  return CircleBuff.Length - ReadBytesAvailable - 1;  

        private void IncrementTail()
        
            Tail = (Tail + 1) % CircleBuff.Length;
        

        public override int Read(byte[] buffer, int offset, int count)
        
            if (disposed)
            
                throw new ObjectDisposedException("The stream has been disposed.");
            
            if (IsCompleted)
            
                throw new EndOfStreamException("The stream is empty and has been marked complete for adding.");
            
            if (count == 0)
            
                return 0;
            

            lock (CircleBuff)
            
                DebugOut("Read: requested 0:N0 bytes. Available = 1:N0.", count, ReadBytesAvailable);
                while (ReadBytesAvailable == 0)
                
                    if (IsAddingCompleted)
                    
                        IsCompleted = true;
                        return 0;
                    
                    Monitor.Wait(CircleBuff);
                

                // If Head < Tail, then there are bytes available at the end of the buffer
                // and also at the front of the buffer.
                // If reading from Tail to the end doesn't fulfill the request,
                // and there are still bytes available,
                // then read from the start of the buffer.
                DebugOut("Read: Head=0, Tail=1, Avail=2", Head, Tail, ReadBytesAvailable);

                IncrementTail();
                int bytesToRead;
                if (Tail > Head)
                
                    // When Tail > Head, we know that there are at least
                    // (CircleBuff.Length - Tail) bytes available in the buffer.
                    bytesToRead = CircleBuff.Length - Tail;
                
                else
                
                    bytesToRead = Head - Tail;
                

                // Don't read more than count bytes!
                bytesToRead = Math.Min(bytesToRead, count);

                Buffer.BlockCopy(CircleBuff, Tail, buffer, offset, bytesToRead);
                Tail += (bytesToRead - 1);
                int bytesRead = bytesToRead;

                // At this point, either we've exhausted the buffer,
                // or Tail is at the end of the buffer and has to wrap around.
                if (bytesRead < count && ReadBytesAvailable > 0)
                
                    // We haven't fulfilled the read.
                    IncrementTail();
                    // Tail is always equal to 0 here.
                    bytesToRead = Math.Min((count - bytesRead), (Head - Tail));
                    Buffer.BlockCopy(CircleBuff, Tail, buffer, offset + bytesRead, bytesToRead);
                    bytesRead += bytesToRead;
                    Tail += (bytesToRead - 1);
                

                TotalBytesRead += bytesRead;
                DebugOut("Read: returning 0:N0 bytes. TotalRead=1:N0", bytesRead, TotalBytesRead);
                DebugOut("Read: Head=0, Tail=1, Avail=2", Head, Tail, ReadBytesAvailable);

                Monitor.Pulse(CircleBuff);
                return bytesRead;
            
        

        public override void Write(byte[] buffer, int offset, int count)
        
            if (disposed)
            
                throw new ObjectDisposedException("The stream has been disposed.");
            
            if (IsAddingCompleted)
            
                throw new InvalidOperationException("The stream has been marked as complete for adding.");
            
            lock (CircleBuff)
            
                DebugOut("Write: requested 0:N0 bytes. Available = 1:N0", count, WriteBytesAvailable);
                int bytesWritten = 0;
                while (bytesWritten < count)
                
                    while (WriteBytesAvailable == 0)
                    
                        Monitor.Wait(CircleBuff);
                    
                    DebugOut("Write: Head=0, Tail=1, Avail=2", Head, Tail, WriteBytesAvailable);
                    int bytesToCopy = Math.Min((count - bytesWritten), WriteBytesAvailable);
                    CopyBytes(buffer, offset + bytesWritten, bytesToCopy);
                    TotalBytesWritten += bytesToCopy;
                    DebugOut("Write: 0 bytes written. TotalWritten=1:N0", bytesToCopy, TotalBytesWritten);
                    DebugOut("Write: Head=0, Tail=1, Avail=2", Head, Tail, WriteBytesAvailable);
                    bytesWritten += bytesToCopy;
                    Monitor.Pulse(CircleBuff);
                
            
        


        private void CopyBytes(byte[] buffer, int srcOffset, int count)
        
            // Insert at head
            // The copy might require two separate operations.

            // copy as much as can fit between Head and end of the circular buffer
            int offset = srcOffset;
            int bytesCopied = 0;
            int bytesToCopy = Math.Min(CircleBuff.Length - Head, count);
            if (bytesToCopy > 0)
            
                Buffer.BlockCopy(buffer, offset, CircleBuff, Head, bytesToCopy);
                bytesCopied = bytesToCopy;
                Head = (Head + bytesToCopy) % CircleBuff.Length;
                offset += bytesCopied;
            

            // Copy the remainder, which will go from the beginning of the buffer.
            if (bytesCopied < count)
            
                bytesToCopy = count - bytesCopied;
                Buffer.BlockCopy(buffer, offset, CircleBuff, Head, bytesToCopy);
                Head = (Head + bytesToCopy) % CircleBuff.Length;
            
        

        public void CompleteAdding()
        
            if (disposed)
            
                throw new ObjectDisposedException("The stream has been disposed.");
            
            lock (CircleBuff)
            
                DebugOut("CompleteAdding: 0:N0 bytes written.", TotalBytesWritten);
                IsAddingCompleted = true;
                Monitor.Pulse(CircleBuff);
            
        

        public override bool CanRead  get  return true;  

        public override bool CanSeek  get  return false;  

        public override bool CanWrite  get  return true;  

        public override void Flush()  /* does nothing */ 

        public override long Length  get  throw new NotImplementedException();  

        public override long Position
        
            get  throw new NotImplementedException(); 
            set  throw new NotImplementedException(); 
        

        public override long Seek(long offset, SeekOrigin origin)
        
            throw new NotImplementedException();
        

        public override void SetLength(long value)
        
            throw new NotImplementedException();
        

        private bool disposed = false;

        protected override void Dispose(bool disposing)
        
            if (!disposed)
            
                base.Dispose(disposing);
                disposed = true;
            
        
    

以及如何使用它的示例:

class Program

    static readonly string TestText = "This is a test of the emergency broadcast system.";
    static readonly byte[] TextBytes = Encoding.UTF8.GetBytes(TestText);

    const int Megabyte = 1024 * 1024;

    const int TestBufferSize = 12;

    const int ProducerBufferSize = 4;
    const int ConsumerBufferSize = 5;

    static void Main(string[] args)
    
        Console.WriteLine("TextBytes contains 0:N0 bytes.", TextBytes.Length);
        using (var pcStream = new ProducerConsumerStream(TestBufferSize))
        
            Thread ProducerThread = new Thread(ProducerThreadProc);
            Thread ConsumerThread = new Thread(ConsumerThreadProc);
            ProducerThread.Start(pcStream);
            Thread.Sleep(2000);
            ConsumerThread.Start(pcStream);

            ProducerThread.Join();
            ConsumerThread.Join();
        
        Console.Write("Done. Press Enter.");
        Console.ReadLine();
    

    static void ProducerThreadProc(object state)
    
        Console.WriteLine("Producer: started.");
        var pcStream = (ProducerConsumerStream)state;
        int offset = 0;
        while (offset < TestText.Length)
        
            int bytesToWrite = Math.Min(ProducerBufferSize, TestText.Length - offset);
            pcStream.Write(TextBytes, offset, bytesToWrite);
            offset += bytesToWrite;
        
        pcStream.CompleteAdding();
        Console.WriteLine("Producer: 0:N0 total bytes written.", offset);
        Console.WriteLine("Producer: exit.");
    

    static void ConsumerThreadProc(object state)
    
        Console.WriteLine("Consumer: started.");
        var instream = (ProducerConsumerStream)state;
        int testOffset = 0;

        var inputBuffer = new byte[TextBytes.Length];

        int bytesRead;
        do
        
            int bytesToRead = Math.Min(ConsumerBufferSize, inputBuffer.Length - testOffset);
            bytesRead = instream.Read(inputBuffer, testOffset, bytesToRead);
            //Console.WriteLine("Consumer: 0:N0 bytes read.", bytesRead);
            testOffset += bytesRead;
         while (bytesRead != 0);
        Console.WriteLine("Consumer: 0:N0 total bytes read.", testOffset);

        // Compare bytes read with TextBytes
        for (int i = 0; i < TextBytes.Length; ++i)
        
            if (inputBuffer[i] != TextBytes[i])
            
                Console.WriteLine("Read error at position 0", i);
                break;
            
        
        Console.WriteLine("Consumer: exit.");
    

【讨论】:

@JimMischel 也让我知道,以便我更新我的答案;它仍在 Wayback Machine 中:web.archive.org/web/20151210235510/http://www.informit.com/… @drzaus:我将文章中的代码复制到此处的答案中。【参考方案2】:

我遇到了同样的问题。在我的情况下,第三方包只接受流,但我有一个 IEnumerable,在网上找不到答案,所以我自己写了,我会分享:

public class IEnumerableStringReader : TextReader

    private readonly IEnumerator<string> _enumerator;

    private bool eof = false; // is set to true when .MoveNext tells us there is no more data.
    private char[] curLine = null;
    private int curLinePos = 0;

    private bool disposed = false;

    public IEnumerableStringReader(IEnumerable<string> input)
    
        _enumerator = input.GetEnumerator();
    

    private void GetNextLine()
    
        if (eof) return;

        eof = !_enumerator.MoveNext();
        if (eof) return;

        curLine = $"_enumerator.Current\r\n" // IEnumerable<string> input implies newlines exist betweent he lines.
            .ToCharArray();

        curLinePos = 0;
    

    public override int Peek()
    
        if (disposed) throw new ObjectDisposedException("The stream has been disposed.");

        if (curLine == null || curLinePos == curLine.Length) GetNextLine();
        if (eof) return -1;

        return curLine[curLinePos];
    

    public override int Read()
    
        if (disposed) throw new ObjectDisposedException("The stream has been disposed.");

        if (curLine == null || curLinePos == curLine.Length) GetNextLine();
        if (eof) return -1;

        return curLine[curLinePos++];
    

    public override int Read(char[] buffer, int index, int count)
    
        if (disposed) throw new ObjectDisposedException("The stream has been disposed.");
        if (count == 0) return 0;

        int charsReturned = 0;
        int maxChars = Math.Min(count, buffer.Length - index); // Assuming we dont run out of input chars, we return count characters if we can. If the space left in the buffer is not big enough we return as many as will fit in the buffer. 

        while (charsReturned < maxChars)
        
            if (curLine == null || curLinePos == curLine.Length) GetNextLine();
            if (eof) return charsReturned;

            int maxCurrentCopy = maxChars - charsReturned;
            int charsAtTheReady = curLine.Length - curLinePos; // chars available in current line                
            int copySize = Math.Min(maxCurrentCopy, charsAtTheReady); // stop at end of buffer.

            // cant use Buffer.BlockCopy because it's byte based and we're dealing with chars.                
            Array.ConstrainedCopy(curLine, curLinePos, buffer, index, copySize);

            index += copySize;
            curLinePos += copySize;
            charsReturned += copySize;
        

        return charsReturned;
    

    public override string ReadLine()
    
        if (curLine == null || curLinePos == curLine.Length) GetNextLine();
        if (eof) return null;

        if (curLinePos > 0) // this is necessary in case the client uses both Read() and ReadLine() calls
        
            var tmp = new string(curLine, curLinePos, (curLine.Length - curLinePos) - 2); // create a new string from the remainder of the char array. The -2 is because GetNextLine appends a crlf.            
            curLinePos = curLine.Length; // so next call will re-read
            return tmp;
        

        // read full line.
        curLinePos = curLine.Length; // so next call will re-read
        return _enumerator.Current; // if all the client does is call ReadLine this (faster) code path will be taken.                       
    

    protected override void Dispose(bool disposing)
    
        if (!disposed)
        
            _enumerator.Dispose();
            base.Dispose(disposing);
            disposed = true;
        
    

就我而言,我想将其用作 Datastreams.Csv 的输入:

using (var tr = new IEnumerableStringReader(input))
using (var reader = new CsvReader(tr))

  while (reader.ReadRecord())
  
    // do whatever
  

【讨论】:

【参考方案3】:

史蒂夫萨德勒写了一个完美的答案。然而,他让事情变得比需要的困难得多

根据reference source of TextReader,您只需覆盖 Peek and Read:

子类必须至少实现 Peek() 和 Read() 方法。

首先我编写了一个函数,将IEnumerable&lt;string&gt; 转换为IEnumerable&lt;char&gt;,在每个字符串的末尾添加一个新行:

private static IEnumerable<char> ReadCharacters(IEnumerable<string> lines)

    foreach (string line in lines)
    
        foreach (char c in line + Environment.NewLine)
        
            yield return c;
        
     

Environment.NewLine 是在每个字符串末尾添加新行的部分。

现在课程很简单:

class EnumStringReader : TextReader

    public EnumStringReader(IEnumerable<string> lines)
    
        this.enumerator = ReadCharacters(lines).GetEnumerator();
        this.dataAvailable = this.enumerator.MoveNext();
    
    private bool disposed = false;
    private bool dataAvailable;
    private readonly IEnumerator<char> enumerator;

构造函数需要读取一系列行。它使用这个序列和之前写的函数将序列转换成一个字符序列,加上Environment.NewLine

它获取转换后的序列的枚举数,并移动到第一个字符。它记住DataAvailable中是否有第一个字符

现在我们准备好 Peek:如果没有可用数据:返回 -1,否则将当前字符返回为 int。不要前进:

public override int Peek()

    this.ThrowIfDisposed();
    return this.dataAvailable ? this.enumerator.Current : -1;

读取:如果没有可用数据,则返回 -1,否则将当前字符返回为 int。前进到下一个字符并记住是否有可用数据:

public override int Read()

    this.ThrowIfDisposed();
    if (this.dataAvailable)
    
        char nextChar = this.enumerator.Current;
        this.dataAvailable = this.enumerator.MoveNext();
        return (int)nextChar;
     
     else
     
         return -1;
     

不要忘记在您处理枚举数的位置覆盖 Dispose(bool)。

这就是我们所需要的。所有其他函数都将使用这两个。

现在用线条填充您的流:

IEnumerable<string> lines = ...
using (TextWriter writer = System.IO.File.CreateText(...))

    using (TextReader reader = new EnumStringReader(lines);
    
        // either write per char:
        while (reader.Peek() != -1)
        
            char c = (char)reader.Read();
            writer.Write(c);
         

        // or write per line:
        string line = reader.ReadLine();
        // line is without newLine!
        while (line != null)
        
            writer.WriteLine(line);
            line = reader.ReadLine();
        

        // or write per block
        buffer buf = new char[4096];
        int nrRead = reader.ReadBlock(buf, 0, buf.Length)
        while (nrRead > 0)
        
            writer.Write(buf, 0, nrRead);
            nrRead = reader.ReadBlock(buf, 0, buf.Length);
        
    

【讨论】:

【参考方案4】:

这是一个只读的Stream 实现,它使用IEnumerable&lt;byte&gt; 作为输入:

public class ByteStream : Stream, IDisposable

    private readonly IEnumerator<byte> _input;
    private bool _disposed;

    public ByteStream(IEnumerable<byte> input)
    
        _input = input.GetEnumerator();
    

    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => 0;
    public override long Position  get; set;  = 0;

    public override int Read(byte[] buffer, int offset, int count)
    
        int i = 0;
        for (; i < count && _input.MoveNext(); i++)
            buffer[i + offset] = _input.Current;
        return i;
    

    public override long Seek(long offset, SeekOrigin origin) => throw new InvalidOperationException();
    public override void SetLength(long value) => throw new InvalidOperationException();
    public override void Write(byte[] buffer, int offset, int count) => throw new InvalidOperationException();
    public override void Flush() => throw new InvalidOperationException();

    void IDisposable.Dispose()
    
        if (_disposed)
            return;
        _input.Dispose();
        _disposed=  true;
    

你仍然需要一个将IEnumerable&lt;string&gt; 转换为IEnumerable&lt;byte&gt; 的函数:

public static IEnumerable<byte> Encode(IEnumerable<string> input, Encoding encoding)

    byte[] newLine = encoding.GetBytes(Environment.NewLine);
    foreach (string line in input)
    
        byte[] bytes = encoding.GetBytes(line);
        foreach (byte b in bytes)
            yield return b;
        foreach (byte b in newLine)
            yield return b;
    

最后,这里是如何在你的控制器中使用它:

public FileResult GetResult()

    IEnumerable<string> data = GetDataForStream();
    var stream = new ByteStream(Encode(data, Encoding.UTF8));
    return File(stream, "text/plain", "Result.txt");

【讨论】:

【参考方案5】:

使用EnumerableToStream Nuget 包,您可以像这样实现您的方法:

using EnumerableToStream;

private Stream ToStringStream(Encoding encoding, IEnumerable<string> data)

    return data.ToStream(encoding);

我有同样的要求,最终推出了我自己的实现,我已经使用了一段时间了。让所有的细节都恰到好处需要一些时间和精力。例如,您希望在将流读取到末尾后释放 IEnumerable,并且不希望将多字节字符部分写入缓冲区。

看到这个问题后,我决定将代码作为 Nuget 包发布。希望它能为您节省几个小时。源代码在GitHub。

【讨论】:

以上是关于IEnumerable 流式传输的主要内容,如果未能解决你的问题,请参考以下文章

合并两个 IEnumerable<T>

每天敲一点code

通过 IEnumerable 和 TPL 数据流传输数据

IOS - 流式传输和下载音频文件

C# gRPC 文件流式传输,原始文件小于流式传输的文件

是否可以从 USB 摄像头流式传输视频和从移动麦克风传输音频以在 android 中进行 RTMP 流式传输