连续从流中读取?

Posted

技术标签:

【中文标题】连续从流中读取?【英文标题】:Continuously reading from a stream? 【发布时间】:2010-05-06 22:13:24 【问题描述】:

我有一个 Stream 对象,它偶尔会在上面获取一些数据,但时间间隔不可预测。出现在 Stream 上的消息是经过良好定义的,并提前声明了其有效负载的大小(大小是包含在每条消息的前两个字节中的 16 位整数)。

我想要一个 StreamWatcher 类来检测 Stream 何时有一些数据。一旦完成,我希望引发一个事件,以便订阅的 StreamProcessor 实例可以处理新消息。

这可以在不直接使用线程的情况下通过 C# 事件来完成吗?看起来应该很简单,但我无法完全理解设计它的正确方法。

【问题讨论】:

【参考方案1】:

当你说不直接使用线程时,我假设你仍然想通过异步调用间接使用它们,否则这不会很有用。

您需要做的就是包装Stream 的异步方法并将结果存储在缓冲区中。首先,让我们定义规范的事件部分:

public delegate void MessageAvailableEventHandler(object sender,
    MessageAvailableEventArgs e);

public class MessageAvailableEventArgs : EventArgs

    public MessageAvailableEventArgs(int messageSize) : base()
    
        this.MessageSize = messageSize;
    

    public int MessageSize  get; private set; 

现在,从流中异步读取一个 16 位整数,并在它准备好时报告:

public class StreamWatcher

    private readonly Stream stream;

    private byte[] sizeBuffer = new byte[2];

    public StreamWatcher(Stream stream)
    
        if (stream == null)
            throw new ArgumentNullException("stream");
        this.stream = stream;
        WatchNext();
    

    protected void OnMessageAvailable(MessageAvailableEventArgs e)
    
        var handler = MessageAvailable;
        if (handler != null)
            handler(this, e);
    

    protected void WatchNext()
    
        stream.BeginRead(sizeBuffer, 0, 2, new AsyncCallback(ReadCallback),
            null);
    

    private void ReadCallback(IAsyncResult ar)
    
        int bytesRead = stream.EndRead(ar);
        if (bytesRead != 2)
            throw new InvalidOperationException("Invalid message header.");
        int messageSize = sizeBuffer[1] << 8 + sizeBuffer[0];
        OnMessageAvailable(new MessageAvailableEventArgs(messageSize));
        WatchNext();
    

    public event MessageAvailableEventHandler MessageAvailable;

我想就是这样。这假定处理消息的任何类也可以访问Stream,并准备根据事件中的消息大小同步或异步读取它。如果您希望观察者类真正阅读整个消息,那么您必须添加更多代码才能做到这一点。

【讨论】:

+1 实现证明我的 BeginRead 理论不是正确的解决方案。您的实施消除了这个疑问。 光荣!这看起来可以解决问题,我明天试试。 +1。吹毛求疵:AFAIK Stream.Read 允许返回少于 count 个字节,只要它返回至少一个字节(如果未到达末尾)。因此,如果 (bytesRead != 2) 不应该抛出异常,而是再次 BeginRead 直到读取了两个字节。 @dtb:这是一个很好的观点——它有点取决于流。有些会阻塞直到数据可用,有些会提前返回,有些仍然会抛出异常。要使其与 any 流一起使用,我认为您需要在构造函数中选择行为。 您不想在ReadCallback 中将OnMessageAvailable 提升到WatchNext 之前吗?【参考方案2】:

是的,这可以做到。使用带有AsyncCallback 的非阻塞Stream.BeginRead 方法。当数据可用时,回调被异步调用。在回调中调用Stream.EndRead获取数据,再次调用Stream.BeginRead获取下一块数据。在一个字节数组中缓冲传入数据,该数组足以容纳消息。一旦字节数组已满(可能需要多次回调调用),引发事件。然后读取下一条消息大小,创建一个新缓冲区,重复,完成。

【讨论】:

@Nayan:异步多线程。这个问题本质上是多线程之一。我怀疑 OP 的意思是他不想自己显式创建线程。 @Nayan:我假设 OP 正在寻找一种不显式创建新线程并使用阻塞读取方法的解决方案,而是寻找非阻塞异步解决方案。没有线程就不能实现异步。 我明白你在说什么,但OP没有提到是使用Thread还是threading! =) 让他理解和选择。 @Nayan:他确实说过“不直接使用 Threads”,用大写的“T”启动!【参考方案3】:

正常的做法是使用Stream暴露的.NET asynchronous programming pattern。本质上,您通过调用Stream.BeginRead 开始异步读取,向其传递byte[] 缓冲区和从流中读取数据时将调用的回调方法。在回调方法中,您调用Stream.EndRead,将IAsncResult 参数传递给您的回调。 EndRead 的返回值告诉你有多少字节被读入缓冲区。

以这种方式收到前几个字节后,您可以再次调用BeginRead 来等待消息的其余部分(如果您第一次没有获得足够的数据)。获得完整信息后,您可以发起活动。

【讨论】:

【参考方案4】:

使用 Stream.BeginRead() 和在单独的线程中使用同步 Stream.Read() 方法不一样吗?

【讨论】:

以上是关于连续从流中读取?的主要内容,如果未能解决你的问题,请参考以下文章

MySQL 从流中读取失败

iOS - 如何从流中读取音频并播放音频

NAudio在改变音高而不是文件时寻找一种从流中读取的方法

用于从流中读取多个 protobuf 消息的 python 示例

java如何直接从流中读取excel的文件内容?Stream

如何从流中读取 CSV 文件并在写入时处理每一行?