异步使用 NamedPipeServerStream 和 NamedPipeClientStream

Posted

技术标签:

【中文标题】异步使用 NamedPipeServerStream 和 NamedPipeClientStream【英文标题】:Using NamedPipeServerStream and NamedPipeClientStream asynchronously 【发布时间】:2018-01-02 10:36:34 【问题描述】:

我对服务器/客户端架构有以下要求:

    编写一个异步工作的服务器/客户端。

    通信需要是双工的,即两端读写。

    多个客户端可以在任何给定时间连接到服务器。

    服务器/客户端应该等到它们可用并最终建立连接。

    一旦客户端连接,它应该写入流。

    然后服务器应该从流中读取并将响应写回客户端。

    最后,客户端应该读取响应并结束通信。

因此,考虑到以下要求,我编写了以下代码,但我不太确定,因为管道文档有些缺乏,不幸的是,代码似乎无法正常工作,它挂在某一点。

namespace PipesAsyncAwait471

    using System;
    using System.Collections.Generic;
    using System.IO.Pipes;
    using System.Linq;
    using System.Threading.Tasks;

    internal class Program
    
        private static async Task Main()
        
            List<Task> tasks = new List<Task> 
                HandleRequestAsync(),
            ;

            tasks.AddRange(Enumerable.Range(0, 10).Select(i => SendRequestAsync(i, 0, 5)));

            await Task.WhenAll(tasks);
        

        private static async Task HandleRequestAsync()
        
            using (NamedPipeServerStream server = new NamedPipeServerStream("MyPipe",
                                                                            PipeDirection.InOut,
                                                                            NamedPipeServerStream.MaxAllowedServerInstances,
                                                                            PipeTransmissionMode.Message,
                                                                            PipeOptions.Asynchronous))
            
                Console.WriteLine("Waiting...");

                await server.WaitForConnectionAsync().ConfigureAwait(false);

                if (server.IsConnected)
                
                    Console.WriteLine("Connected");

                    if (server.CanRead) 
                        // Read something...
                    

                    if (server.CanWrite) 
                        // Write something... 

                        await server.FlushAsync().ConfigureAwait(false);

                        server.WaitForPipeDrain();
                    

                    server.Disconnect();

                    await HandleRequestAsync().ConfigureAwait(false);
                
            
        

        private static async Task SendRequestAsync(int index, int counter, int max)
        
            using (NamedPipeClientStream client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.Asynchronous))
            
                await client.ConnectAsync().ConfigureAwait(false);

                if (client.IsConnected)
                
                    Console.WriteLine($"Index: index Counter: counter");

                    if (client.CanWrite) 
                        // Write something...

                        await client.FlushAsync().ConfigureAwait(false);

                        client.WaitForPipeDrain();
                    

                    if (client.CanRead) 
                        // Read something...
                    
                

                if (counter <= max) 
                    await SendRequestAsync(index, ++counter, max).ConfigureAwait(false);
                
                else 
                    Console.WriteLine($"index Done!");
                
            
        
    

假设:

我希望它工作的方式是我在调用SendRequestAsync 时发出的所有请求同时执行,每个请求然后发出额外的请求,直到它到达6,最后,它应该打印“完成!”。

备注:

    我在 .NET Framework 4.7.1 和 .NET Core 2.0 上对其进行了测试,得到了相同的结果。

    客户端和服务器之间的通信始终是本地机器,其中客户端是 Web 应用程序,可以排队一些作业,例如启动第 3 方进程和服务器将作为 Windows 服务部署在与部署这些客户端的 Web 服务器相同的计算机上。

【问题讨论】:

你可能想要使用 TCP 而不是使用管道。查看 msdn 示例:docs.microsoft.com/en-us/dotnet/framework/network-programming/… @jdweng 客户端和服务器是在同一台机器上找到的进程,因此 TCP 将是一个矫枉过正。 绝对错误。数以百万计的应用程序在本地 PC 上使用 TCP。管道用于标准输入和标准输出,但很少用于隧道进入应用程序。使用 TCP,您可以使用像 wireshark 或 fiddler 这样的嗅探器来调试应用程序。 @jdweng TCP 用于通常需要进行远程连接的本地 PC,管道在 IPC 中大量使用,我正是为此使用它,除了客户端和服务器之外,还有第三个服务器启动的方进程,这些进程的标准输入/输出重定向,但这与手头的问题无关,所以我没有费心去详细说明它。 对 SendRequestAsync() 的递归调用非常难看。这不断创建新的客户端管道,没有一个被关闭。当它建立了太多的联系时,这个节目可能已经结束了。把它扔掉,并使用MSDN sample code 作为指南来解决这个问题。 【参考方案1】:

这是经过一些迭代后的完整代码:

namespace PipesAsyncAwait471

    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.IO.Pipes;
    using System.Linq;
    using System.Threading.Tasks;

    internal class Program
    
        private const int MAX_REQUESTS = 1000;

        private static void Main()
        
            var tasks = new List<Task> 
                //Task.Run(() => HandleRequest(0))
                HandleRequestAsync(0)
            ;

            tasks.AddRange(Enumerable.Range(0, MAX_REQUESTS).Select(i => Task.Factory.StartNew(() => SendRequest(i), TaskCreationOptions.LongRunning)));

            Task.WhenAll(tasks);

            Console.ReadKey();
        

        private static void HandleRequest(int counter)
        
            try 
                var server = new NamedPipeServerStream("MyPipe",
                                                    PipeDirection.InOut,
                                                    NamedPipeServerStream.MaxAllowedServerInstances,
                                                    PipeTransmissionMode.Message,
                                                    PipeOptions.Asynchronous);

                Console.WriteLine($"Waiting a client... counter");

                server.BeginWaitForConnection(WaitForConnectionCallback, server);
            
            catch (Exception ex) 
                Console.WriteLine(ex);
            

            void WaitForConnectionCallback(IAsyncResult result)
            
                var server = (NamedPipeServerStream)result.AsyncState;

                int index = -1;

                try 
                    server.EndWaitForConnection(result);

                    HandleRequest(++counter);

                    if (server.IsConnected) 
                        var request = new byte[4];
                        server.BeginRead(request, 0, request.Length, ReadCallback, server);
                        index = BitConverter.ToInt32(request, 0);
                        Console.WriteLine($"index Request.");

                        var response = BitConverter.GetBytes(index);
                        server.BeginWrite(response, 0, response.Length, WriteCallback, server);
                        server.Flush();
                        server.WaitForPipeDrain();
                        Console.WriteLine($"index Pong.");

                        server.Disconnect();
                        Console.WriteLine($"index Disconnected.");
                    
                
                catch (IOException ex) 
                    Console.WriteLine($"index\n\tex");
                
                finally 
                    server.Dispose();
                
            

            void ReadCallback(IAsyncResult result) 
            
                var server = (NamedPipeServerStream)result.AsyncState;

                try 
                    server.EndRead(result);
                
                catch (IOException ex) 
                    Console.WriteLine(ex);
                
            

            void WriteCallback(IAsyncResult result) 
            
                var server = (NamedPipeServerStream)result.AsyncState;

                try 
                    server.EndWrite(result);
                
                catch (IOException ex) 
                    Console.WriteLine(ex);
                
            
        

        private static async Task HandleRequestAsync(int counter)
        
            NamedPipeServerStream server = null;

            int index = -1;

            try 
                server = new NamedPipeServerStream("MyPipe",
                                                PipeDirection.InOut,
                                                NamedPipeServerStream.MaxAllowedServerInstances,
                                                PipeTransmissionMode.Message,
                                                PipeOptions.Asynchronous);

                Console.WriteLine($"Waiting a client... counter");

                await server.WaitForConnectionAsync()
                            .ContinueWith(async t => await HandleRequestAsync(++counter).ConfigureAwait(false))
                            .ConfigureAwait(false);

                if (server.IsConnected) 
                    var request = new byte[4];
                    await server.ReadAsync(request, 0, request.Length).ConfigureAwait(false);
                    index = BitConverter.ToInt32(request, 0);
                    Console.WriteLine($"index Request.");

                    var response = BitConverter.GetBytes(index);
                    await server.WriteAsync(response, 0, response.Length).ConfigureAwait(false);
                    await server.FlushAsync().ConfigureAwait(false);
                    server.WaitForPipeDrain();
                    Console.WriteLine($"index Pong.");

                    server.Disconnect();
                    Console.WriteLine($"index Disconnected.");
                
            
            catch (IOException ex) 
                Console.WriteLine($"index\n\tex");
            
            finally 
                server?.Dispose();
            
        

        private static void SendRequest(int index)
        
            NamedPipeClientStream client = null;

            try 
                client = new NamedPipeClientStream(".", "MyPipe", PipeDirection.InOut, PipeOptions.None);

                client.Connect();

                var request = BitConverter.GetBytes(index);
                client.Write(request, 0, request.Length);
                client.Flush();
                client.WaitForPipeDrain();
                Console.WriteLine($"index Ping.");

                var response = new byte[4];
                client.Read(response, 0, response.Length);
                index = BitConverter.ToInt32(response, 0);
                Console.WriteLine($"index Response.");
            
            catch (Exception ex) 
                Console.WriteLine($"index\n\tex");
            
            finally 
                client?.Dispose();
            
        
    

您可以对消息进行排序并观察以下内容:

    正确打开和关闭连接。

    数据发送和接收正确。

    最后,服务器仍在等待进一步的连接。

更新:

    PipeOptions.Asynchronous 更改为 PipeOptions.None 否则它似乎在请求期间挂起,然后才立即处理它们。

    PipeOptions.Asynchronous 只会导致与 PipeOptions.None 不同的执行顺序,这会暴露代码中的竞争条件/死锁。例如,如果您使用任务管理器来监控进程的线程数,您可以看到它的效果......您应该看到它以每秒大约 1 个线程的速度爬行,直到达到大约 100 个线程(可能是 110 左右),此时您的代码运行完成。或者,如果您在开头添加 ThreadPool.SetMinThreads(200, 200)。您的代码有一个问题,如果发生错误的排序(并且使用异步更有可能发生这种情况),您会创建一个循环,直到有足够的线程来运行您的 main 方法已排队的所有并发 ConnectAsyncs ,这并不是真正的异步,而只是创建一个工作项来调用同步的 Connect 方法(这很不幸,正是这样的问题是我敦促人们不要公开仅将工作项排队到的异步 API 的原因之一调用同步方法)。 Source.

    修改并简化了示例:

      管道没有真正的异步Connect 方法,ConnectAsync 在后台使用Task.Factory.StartNew,所以您不妨使用Connect 然后传递方法(在我们的示例中为SendRequest)将同步的Connect 版本调用为Task.Factory.StartNew

      服务器现在完全异步,据我所知,它可以正常工作。

      为服务器添加了两个实现,一个使用回调,另一个利用异步/等待功能,因为我找不到这两个的好例子。

希望对你有帮助。

【讨论】:

【参考方案2】:

断开连接时,WaitForPipeDrain() 可能会由于管道损坏而抛出 IOException

如果这种情况发生在您的服务器 Task,那么它将永远不会监听下一个连接,并且所有剩余的客户端连接都挂在 ConnectAsync()

如果这发生在其中一个客户端任务中,那么它将不会继续递归并增加该索引的计数器。

如果您将对WaitForPipeDrain() 的调用封装在try/catch 中,程序将永远继续运行,因为您的函数HandleRequestAsync() 是无限递归的。

简而言之,要让它发挥作用:

    WaitForPipeDrain()处理IOException HandleRequestAsync() 必须在某个时候完成。

【讨论】:

我试图用 try/catch 块包装WaitForPipeDrain,但我仍然得到相同的结果,HandleRequestAsync 应该永远运行,所以它不应该结束以侦听进一步的请求,如果它进入了一个无限循环,由于WaitForConnectionAsync 或得到***Exception 的分配,我应该看到GC 做了很多工作,但它们都没有发生,所以我不知道。 :) 好的,我确实遇到了一个异常,我已经用 try/catch 封装了调用,现在我可以看到它了。

以上是关于异步使用 NamedPipeServerStream 和 NamedPipeClientStream的主要内容,如果未能解决你的问题,请参考以下文章

visA异步锁怎么使用

tornado中使用异步(tornado底层是使用协程写异步代码!)

如何使用异步函数异步监听 Firestore 中的值?

C# 使用多个异步方法

不使用异步等待的异步等待

在异步组件中使用异步响应数据