c# SocketAsyncEventArgs 在 ReceiveAsync 处理程序中阻塞代码

Posted

技术标签:

【中文标题】c# SocketAsyncEventArgs 在 ReceiveAsync 处理程序中阻塞代码【英文标题】:c# SocketAsyncEventArgs blocking code inside ReceiveAsync handler 【发布时间】:2014-09-08 16:02:14 【问题描述】:

我正在测试以下两种情况,一种有效,另一种无效。 我在两台不同的机器上运行套接字服务器和套接字客户端应用程序

这两个场景都在使用 socketasynceventargs

场景 1(作品) 循环创建 40k 个套接字客户端,等待所有连接建立,然后所有客户端同时向服务器发送消息并接收服务器的响应 10 次(即发送/接收发生 10 次)。

场景 2(不起作用。我收到很多连接拒绝错误) 循环创建 40k 个套接字客户端,并在每个客户端连接后立即向服务器发送/接收相同的 10 条消息,而不是等待 40k 连接建立。

我无法弄清楚为什么我的第二种情况会失败。我知道在场景 1 中,在建立所有 40k 连接之前,服务器并没有做太多事情。但它能够同时与所有客户端进行通信。有什么想法吗??

感谢您的耐心等待。

这是套接字服务器代码

public class SocketServer
    

   private static System.Timers.Timer MonitorTimer = new System.Timers.Timer();
        public static SocketServerMonitor socket_monitor = new SocketServerMonitor();
        private int m_numConnections; 
        private int m_receiveBufferSize;
        public static BufferManager m_bufferManager;  
        Socket listenSocket;           

        public static SocketAsyncEventArgsPool m_readWritePool;
        public static int m_numConnectedSockets;    
        private int cnt = 0;

        public static int Closecalled=0;


        public SocketServer(int numConnections, int receiveBufferSize)
        
            m_numConnectedSockets = 0;
            m_numConnections = numConnections;
            m_receiveBufferSize = receiveBufferSize;


            m_bufferManager = new BufferManager(receiveBufferSize * numConnections ,
               receiveBufferSize);

            m_readWritePool = new SocketAsyncEventArgsPool(numConnections);

        


        public void Init()
        
            MonitorTimer.Interval = 30000;
            MonitorTimer.Start();
            MonitorTimer.Elapsed += new System.Timers.ElapsedEventHandler(socket_monitor.Log);


            m_bufferManager.InitBuffer();


            SocketAsyncEventArgs readWriteEventArg;

            for (int i = 0; i < m_numConnections; i++)
            

                readWriteEventArg = new SocketAsyncEventArgs();

                m_readWritePool.Push(readWriteEventArg);
            

        


        public void Start(IPEndPoint localEndPoint)
        

            listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
            listenSocket.Bind(localEndPoint);

            listenSocket.Listen(1000);


            StartAccept(null);
        

        public void Stop()
        
            if (listenSocket == null)
                return;
            listenSocket.Close();
            listenSocket = null;


            Thread.Sleep(15000);
        

        private void StartAccept(SocketAsyncEventArgs acceptEventArg)
        
            if (acceptEventArg == null)
            
                acceptEventArg = new SocketAsyncEventArgs();
                acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
            
            else
            
                // socket must be cleared since the context object is being reused
                acceptEventArg.AcceptSocket = null;
            

            try
            
                bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
                if (!willRaiseEvent)
                
                    ProcessAccept(acceptEventArg);
                
            
            catch (Exception e)
            

            
        


        void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
        
            ProcessAccept(e);
        

        private void ProcessAccept(SocketAsyncEventArgs e)
        
            Interlocked.Increment(ref m_numConnectedSockets);
            socket_monitor.IncSocketsConnected();


            SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop();
            m_bufferManager.SetBuffer(readEventArgs);

            readEventArgs.UserToken = new AsyncUserToken  id = cnt++, StarTime = DateTime.Now ;
            readEventArgs.AcceptSocket = e.AcceptSocket;
            SocketHandler handler=new SocketHandler(readEventArgs);

            StartAccept(e);
        

       






class SocketHandler
    
        private SocketAsyncEventArgs _socketEventArgs;

        public SocketHandler(SocketAsyncEventArgs socketAsyncEventArgs)
        
            _socketEventArgs = socketAsyncEventArgs;
            _socketEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
            StartReceive(_socketEventArgs);
        


        private void StartReceive(SocketAsyncEventArgs receiveSendEventArgs)
        

            bool willRaiseEvent = receiveSendEventArgs.AcceptSocket.ReceiveAsync(receiveSendEventArgs);
            if (!willRaiseEvent)
            
                ProcessReceive(receiveSendEventArgs);
            
        


        private void ProcessReceive(SocketAsyncEventArgs e)
        
            // check if the remote host closed the connection
            AsyncUserToken token = (AsyncUserToken)e.UserToken;
            //token.StarTime = DateTime.Now;
            if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
            
                // process the data here

                //reply to client
                byte[] AckData1 = BitConverter.GetBytes(1);
                SendData(AckData1, 0, AckData1.Length, e);


                StartReceive(e);
            
            else
            
                CloseClientSocket(e);
            
        


        private void IO_Completed(object sender, SocketAsyncEventArgs e)
        
            // determine which type of operation just completed and call the associated handler 
            switch (e.LastOperation)
            
                case SocketAsyncOperation.Receive:
                    ProcessReceive(e);
                    break;
                case SocketAsyncOperation.Send:
                    ProcessSend(e);
                    break;
                default:
                    throw new ArgumentException("The last operation completed on the socket was not a receive or send");
            

        





        private void CloseClientSocket(SocketAsyncEventArgs e)
        

            AsyncUserToken token = e.UserToken as AsyncUserToken;

            // close the socket associated with the client 
            try
            

                e.AcceptSocket.Shutdown(SocketShutdown.Send);
            

            catch (Exception ex)
            

            

            e.AcceptSocket.Close();

            Interlocked.Decrement(ref SocketServer.m_numConnectedSockets);
            SocketServer.socket_monitor.DecSocketsConnected();

            SocketServer.m_bufferManager.FreeBuffer(e);

            e.Completed -= new EventHandler<SocketAsyncEventArgs>(IO_Completed);

            SocketServer.m_readWritePool.Push(e);

        


        public void SendData(Byte[] data, Int32 offset, Int32 count, SocketAsyncEventArgs args)
        

            try
            

                Socket socket = args.AcceptSocket;
                if (socket.Connected)
                
                    var i = socket.Send(data, offset, count, SocketFlags.None);
                
            
            catch (Exception Ex)
            

            
        
    

这是在connectcallback方法中抛出错误的客户端代码

// State object for receiving data from remote device.
public class StateObject

    // Client socket.
    public Socket workSocket = null;
    // Size of receive buffer.
    public const int BufferSize = 256;
    // Receive buffer.
    public byte[] buffer = new byte[BufferSize];
    // Received data string.
    public StringBuilder sb = new StringBuilder();

    public int count = 0;


public class AsynchronousClient

    // The port number for the remote device.
    private const int port = 11000;

    private static int closecalled = 0;
    private static bool wait = true;
    // ManualResetEvent instances signal completion.
    private static ManualResetEvent connectDone =
        new ManualResetEvent(false);
    private static ManualResetEvent sendDone =
        new ManualResetEvent(false);
    private static ManualResetEvent receiveDone =
        new ManualResetEvent(false);

    // The response from the remote device.
    private static String response = String.Empty;

    private static void StartClient(Socket client, IPEndPoint remoteEP)
    
        // Connect to a remote device.
        try
        
            // Connect to the remote endpoint.
            client.BeginConnect(remoteEP,
                new AsyncCallback(ConnectCallback), new StateObject  workSocket = client );

        
        catch (Exception e)
        
            Console.WriteLine(e.ToString());
        
    

    private static void ConnectCallback(IAsyncResult ar)
    
        try
        
            // Retrieve the socket from the state object.
            StateObject state = (StateObject)ar.AsyncState;

            var client = state.workSocket;
            // Complete the connection.
            client.EndConnect(ar);
            var data = "AA5500C08308353816050322462F01020102191552E7D3FA52E7D3FB1FF85BF1FE9F201000004AB80000000500060800001EFFB72F0D00002973620000800000FFFFFFFF00009D6D00003278002EE16D0000018500000000000000000000003A0000000100000000828C80661FF8B436FE9EA9FC000000120000000700000000000000000000000400000000000000000000000000000000000000000000281E0000327800000000000000000000000000AF967D00000AEA000000000000000000000000";

                     Send(state, data);
        
        catch (Exception e)
        
            Console.WriteLine(e.ToString());
        
    

    private static void Receive(StateObject state)
    
        try
        
          Socket client = state.workSocket;
            // Begin receiving the data from the remote device.
            client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
                new AsyncCallback(ReceiveCallback), state);
        
        catch (Exception e)
        
            Console.WriteLine(e.ToString());
        
    

    private static void ReceiveCallback(IAsyncResult ar)
    
        try
        
            StateObject state = (StateObject)ar.AsyncState;
            Socket client = state.workSocket;

            // Read data from the remote device.
            int bytesRead = client.EndReceive(ar);

            //if (wait)
            //
            //    connectDone.WaitOne();
            //

            if (bytesRead > 0)
            
                state.count = state.count + 1;
                byte[] b = new byte[bytesRead];
                Array.Copy(state.buffer, b, 1);
                if (b[0] == 1)
                

                    if (state.count < 10)
                    
                        var data = "AA5500C08308353816050322462F01020102191552E7D3FA52E7D3FB1FF85BF1FE9F201000004AB80000000500060800001EFFB72F0D00002973620000800000FFFFFFFF00009D6D00003278002EE16D0000018500000000000000000000003A0000000100000000828C80661FF8B436FE9EA9FC000000120000000700000000000000000000000400000000000000000000000000000000000000000000281E0000327800000000000000000000000000AF967D00000AEA000000000000000000000000";
                        Send(state, data);
                    
                    else
                    
                        Interlocked.Increment(ref closecalled);
                        Console.WriteLine("closecalled:-" + closecalled + " at " + DateTime.Now);
                        client.Close();
                    

                
                else
                
                    // Get the rest of the data.
                    client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
                        new AsyncCallback(ReceiveCallback), state);
                
            
            else
            
                client.Close();
            
        
        catch (Exception e)
        
            Console.WriteLine(e.ToString());
        
    

    private static void Send(StateObject state, String data)
    
        try
        
            Socket client = state.workSocket;
            var hexlen = data.Length;
            byte[] byteData = new byte[hexlen / 2];
            int[] hexarray = new int[hexlen / 2];
            int i = 0;
            int k = 0;
            //create the byte array
            while (i < data.Length / 2)
            
                string first = data[i].ToString();
                i++;
                string second = data[i].ToString();
                string x = first + second;
                byteData[k] = (byte)Convert.ToInt32(x, 16);
                i++;
                k++;
            
            // Begin sending the data to the remote device.
            client.BeginSend(byteData, 0, byteData.Length, 0,
                new AsyncCallback(SendCallback), state);
        
        catch (Exception e)
        
            Console.WriteLine(e.ToString());
        
    

    private static void SendCallback(IAsyncResult ar)
    
        try
        
            // Retrieve the socket from the state object.
            StateObject state = (StateObject)ar.AsyncState;
            Socket client = state.workSocket;
            // Complete sending the data to the remote device.
            int bytesSent = client.EndSend(ar);
            Receive(state);
        
        catch (Exception e)
        
            Console.WriteLine(e.ToString());
        
    

    public static int Main(String[] args)
    
        Start();
        Console.ReadLine();
        return 0;
    

    private static void Start()
    
        IPAddress ipaddress = IPAddress.Parse("10.20.2.152");
        IPEndPoint remoteEP = new IPEndPoint(ipaddress, port);

        for (int i = 0; i < 40000; i++)
        
            Thread.Sleep(1);
            // Create a TCP/IP socket.
            try
            
                Socket client = new Socket(AddressFamily.InterNetwork,
                    SocketType.Stream, ProtocolType.Tcp);
                StartClient(client, remoteEP);
            
            catch (Exception e)
            
                Console.WriteLine(e.ToString());
            
            if (i == 39999)
            
                Console.WriteLine("made all conns at " + DateTime.Now);
            
        
    

【问题讨论】:

两种场景下的服务器代码是否相同?如果是这样,则可能存在错误。在这种情况下,你能给我们展示一些它的代码吗? 听起来您已经耗尽了操作系统积压的传入连接。发布一些代码和完整的异常。 我在上面添加了部分代码。我得到的错误是“无法建立连接,因为目标机器主动拒绝它” 启动连接的代码在哪里?指定更高的积压(例如 40k)。 @usr 我已经添加了客户端代码。我尝试了 40k 作为积压但没有运气。你能检查一下客户端代码吗? 【参考方案1】:

我会使用线性队列来接受传入连接。像这样的:

public async Task Accept40KClients()

    for (int i = 0; i < 40000; i++)
    
        // Await this here   -------v
        bool willRaiseEvent = await listenSocket.AcceptAsync(acceptEventArg);
        if (!willRaiseEvent)
        
            ProcessAccept(acceptEventArg);
        
    

如果这还不够快,也许你可以一次等待 10 次,但我认为这已经足够了......不过我可能错了。

【讨论】:

以上是关于c# SocketAsyncEventArgs 在 ReceiveAsync 处理程序中阻塞代码的主要内容,如果未能解决你的问题,请参考以下文章

C#高性能大容量SOCKET并发:SocketAsyncEventArgs封装

C# SocketAsyncEventArgs 和网络故障

SocketAsyncEventArgs 和缓冲,而消息是在部分

C# IOCP完成端口模型(简单实用高效)

是否有必要清理 SocketAsyncEventArgs.Completed?

SocketAsyncEventArgs.Completed 在 Windows 8 中不会触发