NamedPipeServerStream/NamedPipeClientStream 包装器
Posted
技术标签:
【中文标题】NamedPipeServerStream/NamedPipeClientStream 包装器【英文标题】:NamedPipeServerStream/NamedPipeClientStream wrapper 【发布时间】:2017-11-23 12:55:49 【问题描述】:我目前正在为NamedPipeServerStream
/NamedPipeClientStream
编写一个小包装器,它完全基于Event
,而不是使用AsyncCallbacks
。
我公开了几乎所有可能的同步和异步方法(连接/等待连接、写入等),所以如果消费者想要,例如,启动服务器实例并在客户端连接时发送消息,他可以走完全同步路线并执行类似...
var server = new NamedPipeServer("helloWorld");
server.StartAndWait();
server.Write("Welcome!");
或异步方式...
var server = new NamedPipeServer("helloWorld);
server.ClientConnected += x => x.WriteAsync("Welcome!");
server.Start(); //Start() returns immediately
但是,我正在努力寻找一种阅读消息的好方法。目前,当阅读消息时,我会触发 MessageAvailable
事件并将消息作为参数之一传递。
我只是想不出实现同步读取的正确方法。
我考虑过的是:
具有获取消息的GetNextMessage()
同步方法。在内部,这可以通过两种不同的方式处理:
我可以保留一个IEnumerable<Message>
,其中包含所有尚未使用的消息。因此,一旦对方发送了一条消息,我就会从流中读取它并将其存储在内存中,以便以后GetNextMessage()
可以使用它们。优点是它几乎在消息写入后立即释放流,因此它不会阻止另一方发送其他消息。缺点是我完全无法控制我将持有多少条消息或它们的大小。我的IEnumerable<Message>
最终可能会收到 10GB 的未消费消息,而我对此无能为力,因为我无法强制消费者检索消息。
我可以认为我只将一条消息存储在内部缓冲区中,并且只有在通过GetNextMessage()
使用该消息后才开始再次读取。如果我这样做,另一方将被阻止写入其他消息,直到前一个消息被消耗。更准确地说,对方可以一直写到流满为止。这可能是多个小的完整消息或单个不完整的消息。在单个消息不完整的情况下,我认为这是一种更糟糕的方法,因为在发送消息的第 1 部分和后续部分之间,另一端可能最终断开连接,整个消息将丢失。
为了让事情变得更难,在上述任何一种方法中,消费者总是有可能使用事件来接收消息(记住事件包含接收到的消息),因此不需要GetNextMessage()
。我要么需要完全停止在事件中发送消息,要么找到一种不将事件推送到内部缓冲区的方法,如果消息是通过事件消耗的。虽然我可以很容易地判断是否有事件处理程序,但无法知道消息是否真的在那里被处理(即,考虑一个实现这个的类并监听那个事件,但什么都不做)。我可以在这里看到的唯一真正的方法是从事件中删除消息,强制消费者始终调用GetNextMessage()
,但对其他想法持开放态度。
这两种方法还有另一个问题,即如果使用WriteAsync()
(或从不同线程使用Write()
),我无法控制发送消息的顺序。
谁能想到解决这个问题的更好方法?
【问题讨论】:
在有人订阅您的MessageAvailable
之前收到的消息会怎样?他们只是迷路了吗?
目前没有GetNextMessage()
的地方,是的。如果有GetNextMessage()
,那么没有。但这也是客户端和服务器都有Start()
方法的原因。因此,您甚至可以在尝试连接之前订阅事件。尽管Connected
、Disconnected
等,所有其他事件也可以这样说......我们的想法是,如果你关心它,你在调用Start()
之前订阅它
也许使用 GetMessageStream 而不是 GetNextMessage,它将返回 IEnumerable。迭代该可枚举将阻塞,直到有新消息可用。
这仍然让我处于最终可能在本地存储 10GB 消息的情况,因为我无法控制客户端何时使用它们。
是的,但这是不可避免的,这是客户的问题。或者你的意思是你担心你应该总是存储它们,即使客户端没有调用 GetMessageStream?
【参考方案1】:
我建议采用以下方法。创建接口:
public interface ISubscription : IDisposable
Message NextMessage(TimeSpan? timeout);
public class Message
然后像这样实现:
public class NamedPipeServer
public void StartAndWait()
public ISubscription StartAndSubscribe()
// prevent race condition before Start and subscribing to MessageAvailable
var subscription = new Subscription(this);
StartAndWait();
return subscription;
public ISubscription Subscribe()
// if user wants to subscribe and some point after start - why not
return new Subscription(this);
public event Action<Message> MessageAvailable;
private class Subscription : ISubscription
// buffer
private readonly BlockingCollection<Message> _queue = new BlockingCollection<Message>(
new ConcurrentQueue<Message>());
private readonly NamedPipeServer _server;
public Subscription(NamedPipeServer server)
// subscribe to event
_server = server;
_server.MessageAvailable += OnMessageAvailable;
public Message NextMessage(TimeSpan? timeout)
// this is blocking call
if (timeout == null)
return _queue.Take();
else
Message tmp;
if (_queue.TryTake(out tmp, timeout.Value))
return tmp;
return null;
private void OnMessageAvailable(Message msg)
// add to buffer
_queue.Add(msg);
public void Dispose()
// clean up
_server.MessageAvailable -= OnMessageAvailable;
_queue.CompleteAdding();
_queue.Dispose();
然后客户端调用Subscribe
或StartAndSubscribe
。
var sub = server.StartAndSubscribe();
var message = sub.NextMessage();
var messageOrNull = sub.NextMessage(TimeSpan.FromSeconds(1));
sub.Dispose();
这样,如果没有人订阅 - 你就不会缓冲任何消息。如果有人订阅然后不消费 - 这是他们的问题,而不是你的问题,因为缓冲发生在他们现在拥有的订阅中。您还可以限制_queue
阻塞集合的大小,然后添加到它会阻塞,如果达到限制,阻塞您的MessageAvailable
事件,但我不建议这样做。
【讨论】:
嗯...似乎唯一的区别是您添加了订阅机制,并且仅在有人订阅时才缓冲。我有点同意这一点,但是... A) 它似乎在一定程度上偏离了 NamedPipes 默认的工作方式。我认为 NamedPipes 的级别足够低,根本没有它,我也想将我的包装器保持在低级别。 B)对我来说,当您创建具有双向通信的 NamedPipe 时,隐含订阅。 但是您已经想要GetNextMessage
阻塞调用,那么为什么它会影响命名管道的工作方式呢?正如您所说,这是同一个调用,但仅在必要时才执行缓冲。
当您创建双向通信 NamedPipe 时,对我来说隐含订阅时,您似乎正在添加另一层(订阅)。可以争辩说,它并不暗示客户端,它不知道它加入的管道“会话”是两种方式还是一种方式,直到它加入之后,但我不知道我是否担心老实说:P
如果是关于名字的——你可以用不同的名字来命名。因此,当您开始会话时(或以后的任何时候) - 您可以选择返回一些可用于以“同步”方式使用传入消息的对象。并且需要另一层来解决您的问题中提到的问题。
与名称无关,但我无法确切看到它解决了哪些具体问题。也许我遗漏了一些东西,但对我来说,这一切并不是含蓄地说,只要你打电话给Connect()
(这是我在问题中提出的),我就开始将收到的消息添加到内部队列中,我没有控制您何时(或是否)使用它们,我明确表示我仅在您订阅时(在调用 Connect()
之前或之后)开始将消息添加到内部队列,但我仍然无法控制何时(或如果)你曾经消费过它们。我错过了什么吗?以上是关于NamedPipeServerStream/NamedPipeClientStream 包装器的主要内容,如果未能解决你的问题,请参考以下文章