命名管道异步

Posted

技术标签:

【中文标题】命名管道异步【英文标题】:Named pipes async 【发布时间】:2013-08-23 11:52:56 【问题描述】:

我试图建立一个命名管道服务器和客户端来在两个程序之间发送数据。 我的问题是,当我收到数据时,例如。 BeginRead 命令 om 服务器在我从客户端序列化一个对象后触发,它会为同一条消息触发 20 次回调。目标是客户端程序将命令发送到服务器程序。当服务器处理任务时,它会在有连接时将状态更新发送回客户端。

这是我目前的测试程序。

class Program

    static void Main(string[] args)
    
        var server = new PipeServer();
        server.Init();

        var client = new PipeClient();
        if (client.Connect())
        
            Console.WriteLine("Connected to server.");
        
        else
        
            Console.WriteLine("Connection failed.");
            return;
        

        while (true)
        
            Console.Write(" \\> ");
            string input = Console.ReadLine();
            if (string.IsNullOrEmpty(input)) break;

            var arr = input.Split(new char[]  ' ' , StringSplitOptions.RemoveEmptyEntries);
            int value = 0;

            if (arr.Length != 2) break;
            if (!int.TryParse(arr[1], out value)) break;

            var obj = new PipeObject  Name = arr[0], Value = value ;
            client.Send(obj);

            //string result = f.Deserialize(client) as string;
            //Console.WriteLine(result);
        
    


internal class PipeServer

    IFormatter Formatter = new BinaryFormatter();
    public NamedPipeServerStream Instance  get; internal set; 
    public bool IsConnected  get; internal set; 
    byte[] buffer = new byte[65535];
    public object Message  get; set; 

    StreamReader sr;
    StreamWriter sw;

    internal PipeServer()
    
        IsConnected = false;
    

    public void Init()
    
        var ps = new PipeSecurity();
        ps.AddAccessRule(new PipeAccessRule(WindowsIdentity.GetCurrent().User, PipeAccessRights.FullControl, AccessControlType.Allow));
        ps.AddAccessRule(new PipeAccessRule(new SecurityIdentifier(WellKnownSidType.AuthenticatedUserSid, null), PipeAccessRights.ReadWrite, AccessControlType.Allow));

        Instance = new NamedPipeServerStream("Levscan4Pipe", PipeDirection.InOut, 1, PipeTransmissionMode.Message, PipeOptions.Asynchronous | PipeOptions.WriteThrough, 65535, 65535, ps);

        sr = new StreamReader(Instance);
        sw = new StreamWriter(Instance);

        Instance.BeginWaitForConnection(OnClientConnected, Instance);

        Thread t = new Thread(Run);
        t.Start();
    

    void Run()
    
        int index = 0;
        if (IsConnected)
        
            try
            
                Instance.BeginRead(buffer, 0, buffer.Length, OnRead_Completed, Instance);
                //index += Instance.Read(buffer, 0, buffer.Length);
                //try
                //
                //    using (var ms = new MemoryStream(buffer))
                //    
                //        Message = Formatter.Deserialize(ms);
                //        index = 0;
                //    
                //
                //catch (Exception e)
                //
                //    Debug.WriteLine(e.Message);
                //    Debug.WriteLine(e.StackTrace);
                //
            
            catch (IOException)
            
                IsConnected = false;
                Instance.Disconnect();
            
        

        Thread.Sleep(Timeout.Infinite);
        //Instance.WaitForConnection();
        //Thread t = new Thread(Run);
        //t.Start();
    

    void OnClientConnected(IAsyncResult ar)
    
        Instance.EndWaitForConnection(ar);
        IsConnected = true;
    

    void OnRead_Completed(IAsyncResult ar)
    
        var bytes = Instance.EndRead(ar);
        Debug.WriteLine("1 > Read completed - bytes read: 0".FormatWith(bytes, DateTime.Now.ToString()));

        //try
        //
        //    using (var ms = new MemoryStream(buffer))
        //    
        //        Message = Formatter.Deserialize(ms);
        //    
        //
        //catch (Exception e)
        //
        //    Debug.WriteLine(e.Message);
        //    Debug.WriteLine(e.StackTrace);
        //
    


internal class PipeClient

    IFormatter f = new BinaryFormatter();
    public NamedPipeClientStream Instance  get; internal set; 
    StreamWriter sw;
    StreamReader sr;

    public PipeClient()
    
        Instance = new NamedPipeClientStream(".", "Levscan4Pipe", PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough);
        sr = new StreamReader(Instance);
        sw = new StreamWriter(Instance);
    

    public bool Connect()
    
        try
        
            Instance.Connect(5000);
            Instance.ReadMode = PipeTransmissionMode.Message;
            Instance.WaitForPipeDrain();
            return true;
        
        catch
        
            return false;
        
    

    public void Send(object obj)
    
        f.Serialize(Instance, obj);
        Instance.Flush();
        Instance.WaitForPipeDrain();
    

编辑

将 while 循环更改为 if,以启动 BeginRead。这解决了多个回调,但我仍然没有收到完整的消息。

【问题讨论】:

我建议您阅读文档。例如,对流调用 Read 会返回已读取的字节数。看起来你假设你会一口气读完你的整个消息,但不一定是这样。 我在回调中获得了读取的字节数,但我无法获得消息的总字节数。我的 NamedPipeServerStream 对象不支持长度和位置。还发现我不能使用 while(IsConnected) 并且每次调用 BeginRead 函数导致我的回调同时运行多次。 如果我将我的对象序列化为内存流,然后从中获取字节并将它们全部发送,就像使用 pipestreams Write 方法一样,我会直接在回调中获取消息并且它可以工作。为什么不能用我的管道流直接序列化得到相同的结果? 如果您使用 netNamedPipebindings 切换到 WCF,这可能会容易得多。 (除非你真的想做低级开发)。 【参考方案1】:

如果服务器正在写入流,例如:

write field 1
write field 2
write field 3
etc.

写入之间有一段时间,接收方(您的程序)可以读取前三个字段,而服务器仍在写入其他字段。管道流不知道服务器何时完成写入,因此它无法缓冲所有内容并将其一大块发送给您。

当服务器首先将所有内容写入内存流,然后将内存流复制到管道流中时,您的程序可以一次获取所有内容。也许。如果服务器正在发送一个非常大的数据包,您可能只读取其中的一部分。

管道流只是一个字节流。它不会对数据强加任何格式。它没有任何记录或类似的概念。因此,您必须将其视为字节流并自己编写记录等。

如果您需要知道从服务器发送的记录的大小,服务器必须为您将该信息放入流中。通常,服务器会先写入长度,然后再写入数据。然后接收器可以读取长度,将其转换为整数,然后从流中读取那么多字节。而且,是的,可能需要多次读取才能获取所有字节。这就是字节流的本质。

处理此问题的另一种方法是使用记录结束标记。所以服务器发送它的数据并且你的程序读取直到它找到表示记录结束的字节序列。但是,您必须小心,因为服务器可能会发送多条记录,而您的读取可能会抓取一条记录的结尾以及下一条记录的开头。

使用字节流可能需要大量工作,因为您必须在读取字节后重建记录。如果可以的话,使用现有框架(如其中一个 cmets 中提到的 WCF)会容易得多。

【讨论】:

以上是关于命名管道异步的主要内容,如果未能解决你的问题,请参考以下文章

C# 异步命名管道永远等待

通过重叠 IO 的异步命名 Windows 管道通信

未收到 C++ Windows 异步 IO 命名管道第一条消息

Bash 将 stdio 重定向到命名管道

linux进程间通信异步信号处理机制

linux进程间通信异步信号处理机制