NamedPipeServerStream/NamedPipeClientStream 包装器

Posted

技术标签:

【中文标题】NamedPipeServerStream/NamedPipeClientStream 包装器【英文标题】:NamedPipeServerStream/NamedPipeClientStream wrapper 【发布时间】:2017-11-23 12:55:49 【问题描述】:

我目前正在为NamedPipeServerStream/NamedPipeClientStream 编写一个小包装器,它完全基于Event,而不是使用AsyncCallbacks

我公开了几乎所有可能的同步和异步方法(连接/等待连接、写入等),所以如果消费者想要,例如,启动服务器实例并在客户端连接时发送消息,他可以走完全同步路线并执行类似...

var server = new NamedPipeServer("helloWorld");
server.StartAndWait();
server.Write("Welcome!");

或异步方式...

var server = new NamedPipeServer("helloWorld);
server.ClientConnected += x => x.WriteAsync("Welcome!");
server.Start(); //Start() returns immediately

但是,我正在努力寻找一种阅读消息的好方法。目前,当阅读消息时,我会触发 MessageAvailable 事件并将消息作为参数之一传递。

我只是想不出实现同步读取的正确方法。

我考虑过的是:

具有获取消息的GetNextMessage() 同步方法。在内部,这可以通过两种不同的方式处理:

我可以保留一个IEnumerable<Message>,其中包含所有尚未使用的消息。因此,一旦对方发送了一条消息,我就会从流中读取它并将其存储在内存中,以便以后GetNextMessage() 可以使用它们。优点是它几乎在消息写入后立即释放流,因此它不会阻止另一方发送其他消息。缺点是我完全无法控制我将持有多少条消息或它们的大小。我的IEnumerable<Message> 最终可能会收到 10GB 的未消费消息,而我对此无能为力,因为我无法强制消费者检索消息。

我可以认为我只将一条消息存储在内部缓冲区中,并且只有在通过GetNextMessage() 使用该消息后才开始再次读取。如果我这样做,另一方将被阻止写入其他消息,直到前一个消息被消耗。更准确地说,对方可以一直写到流满为止。这可能是多个小的完整消息或单个不完整的消息。在单个消息不完整的情况下,我认为这是一种更糟糕的方法,因为在发送消息的第 1 部分和后续部分之间,另一端可能最终断开连接,整个消息将丢失。

李>

为了让事情变得更难,在上述任何一种方法中,消费者总是有可能使用事件来接收消息(记住事件包含接收到的消息),因此不需要GetNextMessage()。我要么需要完全停止在事件中发送消息,要么找到一种不将事件推送到内部缓冲区的方法,如果消息是通过事件消耗的。虽然我可以很容易地判断是否有事件处理程序,但无法知道消息是否真的在那里被处理(即,考虑一个实现这个的类并监听那个事件,但什么都不做)。我可以在这里看到的唯一真正的方法是从事件中删除消息,强制消费者始终调用GetNextMessage(),但对其他想法持开放态度。

这两种方法还有另一个问题,即如果使用WriteAsync()(或从不同线程使用Write()),我无法控制发送消息的顺序。

谁能想到解决这个问题的更好方法?

【问题讨论】:

在有人订阅您的MessageAvailable 之前收到的消息会怎样?他们只是迷路了吗? 目前没有GetNextMessage()的地方,是的。如果有GetNextMessage(),那么没有。但这也是客户端和服务器都有Start() 方法的原因。因此,您甚至可以在尝试连接之前订阅事件。尽管ConnectedDisconnected 等,所有其他事件也可以这样说......我们的想法是,如果你关心它,你在调用Start()之前订阅它 也许使用 GetMessageStream 而不是 GetNextMessage,它将返回 IEnumerable。迭代该可枚举将阻塞,直到有新消息可用。 这仍然让我处于最终可能在本地存储 10GB 消息的情况,因为我无法控制客户端何时使用它们。 是的,但这是不可避免的,这是客户的问题。或者你的意思是你担心你应该总是存储它们,即使客户端没有调用 GetMessageStream? 【参考方案1】:

我建议采用以下方法。创建接口:

public interface ISubscription : IDisposable 
    Message NextMessage(TimeSpan? timeout);


public class Message 


然后像这样实现:

public class NamedPipeServer         
    public void StartAndWait() 

    

    public ISubscription StartAndSubscribe() 
        // prevent race condition before Start and subscribing to MessageAvailable
        var subscription = new Subscription(this);
        StartAndWait();
        return subscription;
    

    public ISubscription Subscribe() 
        // if user wants to subscribe and some point after start - why not
        return new Subscription(this);
    

    public event Action<Message> MessageAvailable;

    private class Subscription : ISubscription 
        // buffer
        private readonly BlockingCollection<Message> _queue = new BlockingCollection<Message>(
            new ConcurrentQueue<Message>());

        private readonly NamedPipeServer _server;

        public Subscription(NamedPipeServer server) 
            // subscribe to event
            _server = server;
            _server.MessageAvailable += OnMessageAvailable;
        

        public Message NextMessage(TimeSpan? timeout) 
            // this is blocking call
            if (timeout == null)
                return _queue.Take();
            else 
                Message tmp;
                if (_queue.TryTake(out tmp, timeout.Value))
                    return tmp;
                return null;
            
        

        private void OnMessageAvailable(Message msg) 
            // add to buffer
            _queue.Add(msg);
        

        public void Dispose() 
            // clean up
            _server.MessageAvailable -= OnMessageAvailable;
            _queue.CompleteAdding();
            _queue.Dispose();
        
    

然后客户端调用SubscribeStartAndSubscribe

var sub = server.StartAndSubscribe();
var message = sub.NextMessage();
var messageOrNull = sub.NextMessage(TimeSpan.FromSeconds(1));
sub.Dispose();

这样,如果没有人订阅 - 你就不会缓冲任何消息。如果有人订阅然后不消费 - 这是他们的问题,而不是你的问题,因为缓冲发生在他们现在拥有的订阅中。您还可以限制_queue 阻塞集合的大小,然后添加到它会阻塞,如果达到限制,阻塞您的MessageAvailable 事件,但我不建议这样做。

【讨论】:

嗯...似乎唯一的区别是您添加了订阅机制,并且仅在有人订阅时才缓冲。我有点同意这一点,但是... A) 它似乎在一定程度上偏离了 NamedPipes 默认的工作方式。我认为 NamedPipes 的级别足够低,根本没有它,我也想将我的包装器保持在低级别。 B)对我来说,当您创建具有双向通信的 NamedPipe 时,隐含订阅。 但是您已经想要GetNextMessage 阻塞调用,那么为什么它会影响命名管道的工作方式呢?正如您所说,这是同一个调用,但仅在必要时才执行缓冲。 当您创建双向通信 NamedPipe 时,对我来说隐含订阅时,您似乎正在添加另一层(订阅)。可以争辩说,它并不暗示客户端,它不知道它加入的管道“会话”是两种方式还是一种方式,直到它加入之后,但我不知道我是否担心老实说:P 如果是关于名字的——你可以用不同的名字来命名。因此,当您开始会话时(或以后的任何时候) - 您可以选择返回一些可用于以“同步”方式使用传入消息的对象。并且需要另一层来解决您的问题中提到的问题。 与名称无关,但我无法确切看到它解决了哪些具体问题。也许我遗漏了一些东西,但对我来说,这一切并不是含蓄地说,只要你打电话给Connect()(这是我在问题中提出的),我就开始将收到的消息添加到内部队列中,我没有控制您何时(或是否)使用它们,我明确表示我仅在您订阅时(在调用 Connect() 之前或之后)开始将消息添加到内部队列,但我仍然无法控制何时(或如果)你曾经消费过它们。我错过了什么吗?

以上是关于NamedPipeServerStream/NamedPipeClientStream 包装器的主要内容,如果未能解决你的问题,请参考以下文章