连续从流中读取?
Posted
技术标签:
【中文标题】连续从流中读取?【英文标题】:Continuously reading from a stream? 【发布时间】:2010-05-06 22:13:24 【问题描述】:我有一个 Stream 对象,它偶尔会在上面获取一些数据,但时间间隔不可预测。出现在 Stream 上的消息是经过良好定义的,并提前声明了其有效负载的大小(大小是包含在每条消息的前两个字节中的 16 位整数)。
我想要一个 StreamWatcher 类来检测 Stream 何时有一些数据。一旦完成,我希望引发一个事件,以便订阅的 StreamProcessor 实例可以处理新消息。
这可以在不直接使用线程的情况下通过 C# 事件来完成吗?看起来应该很简单,但我无法完全理解设计它的正确方法。
【问题讨论】:
【参考方案1】:当你说不直接使用线程时,我假设你仍然想通过异步调用间接使用它们,否则这不会很有用。
您需要做的就是包装Stream
的异步方法并将结果存储在缓冲区中。首先,让我们定义规范的事件部分:
public delegate void MessageAvailableEventHandler(object sender,
MessageAvailableEventArgs e);
public class MessageAvailableEventArgs : EventArgs
public MessageAvailableEventArgs(int messageSize) : base()
this.MessageSize = messageSize;
public int MessageSize get; private set;
现在,从流中异步读取一个 16 位整数,并在它准备好时报告:
public class StreamWatcher
private readonly Stream stream;
private byte[] sizeBuffer = new byte[2];
public StreamWatcher(Stream stream)
if (stream == null)
throw new ArgumentNullException("stream");
this.stream = stream;
WatchNext();
protected void OnMessageAvailable(MessageAvailableEventArgs e)
var handler = MessageAvailable;
if (handler != null)
handler(this, e);
protected void WatchNext()
stream.BeginRead(sizeBuffer, 0, 2, new AsyncCallback(ReadCallback),
null);
private void ReadCallback(IAsyncResult ar)
int bytesRead = stream.EndRead(ar);
if (bytesRead != 2)
throw new InvalidOperationException("Invalid message header.");
int messageSize = sizeBuffer[1] << 8 + sizeBuffer[0];
OnMessageAvailable(new MessageAvailableEventArgs(messageSize));
WatchNext();
public event MessageAvailableEventHandler MessageAvailable;
我想就是这样。这假定处理消息的任何类也可以访问Stream
,并准备根据事件中的消息大小同步或异步读取它。如果您希望观察者类真正阅读整个消息,那么您必须添加更多代码才能做到这一点。
【讨论】:
+1 实现证明我的 BeginRead 理论不是正确的解决方案。您的实施消除了这个疑问。 光荣!这看起来可以解决问题,我明天试试。 +1。吹毛求疵:AFAIK Stream.Read 允许返回少于 count 个字节,只要它返回至少一个字节(如果未到达末尾)。因此,如果(bytesRead != 2)
不应该抛出异常,而是再次 BeginRead 直到读取了两个字节。
@dtb:这是一个很好的观点——它有点取决于流。有些会阻塞直到数据可用,有些会提前返回,有些仍然会抛出异常。要使其与 any 流一起使用,我认为您需要在构造函数中选择行为。
您不想在ReadCallback
中将OnMessageAvailable
提升到WatchNext
之前吗?【参考方案2】:
是的,这可以做到。使用带有AsyncCallback 的非阻塞Stream.BeginRead 方法。当数据可用时,回调被异步调用。在回调中调用Stream.EndRead获取数据,再次调用Stream.BeginRead获取下一块数据。在一个字节数组中缓冲传入数据,该数组足以容纳消息。一旦字节数组已满(可能需要多次回调调用),引发事件。然后读取下一条消息大小,创建一个新缓冲区,重复,完成。
【讨论】:
@Nayan:异步是多线程。这个问题本质上是多线程之一。我怀疑 OP 的意思是他不想自己显式创建线程。 @Nayan:我假设 OP 正在寻找一种不显式创建新线程并使用阻塞读取方法的解决方案,而是寻找非阻塞异步解决方案。没有线程就不能实现异步。 我明白你在说什么,但OP没有提到是使用Thread
还是threading! =) 让他理解和选择。
@Nayan:他确实说过“不直接使用 Threads”,用大写的“T”启动!【参考方案3】:
正常的做法是使用Stream
暴露的.NET asynchronous programming pattern。本质上,您通过调用Stream.BeginRead
开始异步读取,向其传递byte[]
缓冲区和从流中读取数据时将调用的回调方法。在回调方法中,您调用Stream.EndRead
,将IAsncResult
参数传递给您的回调。 EndRead
的返回值告诉你有多少字节被读入缓冲区。
以这种方式收到前几个字节后,您可以再次调用BeginRead
来等待消息的其余部分(如果您第一次没有获得足够的数据)。获得完整信息后,您可以发起活动。
【讨论】:
【参考方案4】:使用 Stream.BeginRead() 和在单独的线程中使用同步 Stream.Read() 方法不一样吗?
【讨论】:
以上是关于连续从流中读取?的主要内容,如果未能解决你的问题,请参考以下文章
用于从流中读取多个 protobuf 消息的 python 示例