如何在 C# 应用程序中取消异步发送套接字

Posted

技术标签:

【中文标题】如何在 C# 应用程序中取消异步发送套接字【英文标题】:How to cancel Async Send Socket in C# application 【发布时间】:2015-02-11 07:07:59 【问题描述】:

我有一个应用程序,它只是从队列中提取项目,然后尝试通过网络套接字异步发送它们。

当出现问题或客户端中止主机套接字时,我遇到了一些问题。

这是我的一些代码:我认为它可能比我的话解释得更好:

这是我的SocketState.cs 类,其中包含套接字和相关信息:

public class SocketState

    public const int BufferSize = 256;
    public Socket WorkSocket  get; set; 
    public byte[] Buffer  get; set;    

    /// <summary>
    /// Constructor receiving a socket
    /// </summary>
    /// <param name="socket"></param>
    public SocketState(Socket socket)
    
        WorkSocket = socket;
        Buffer = new byte[BufferSize];
    

这是我的SocketHandler.cs 类,它控制着大多数套接字操作:

public class SocketHandler : IObserver

    # region Class Variables
    private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
    private SocketState _state;
    private OutgoingConnectionManager _parentConnectionManager;
    private int _recieverId;
    private readonly ManualResetEvent _sendDone = new ManualResetEvent(false);

    public volatile bool NameIsSet = false;
    private ManualResetEvent _receiveDone = new ManualResetEvent(false);
    public String Name;
    public readonly Guid HandlerId;
    # endregion Class Variables

    /// <summary>
    /// Constructor
    /// </summary>
    public SocketHandler(SocketState state)
    
        HandlerId = Guid.NewGuid();
        _state = state;       
        _state.WorkSocket.BeginReceive(_state.Buffer, 0, SocketState.BufferSize, 0, new AsyncCallback(ReceiveCallback), this._state);
    

    /// <summary>
    /// Set the receiver id for this socket.
    /// </summary>
    public void SetReceiverId(int receiverId)
    
        _recieverId = receiverId;
    

    /// <summary>
    /// Stops / closes a connection
    /// </summary>
    public void Stop()
    
        CloseConnection();
    

    /// <summary>
    /// Used to set this connections parent connection handler
    /// </summary>
    /// <param name="conMan"></param>
    public void SetConnectionManager(OutgoingConnectionManager conMan)
    
        _parentConnectionManager = conMan;
    

    /// <summary>
    /// Returns if the socket is connected or not
    /// </summary>
    /// <returns></returns>
    public bool IsConnected()
    
        return _state.WorkSocket.Connected;
    

    /// <summary>
    /// Public Class that implements the observer interface , this function provides a portal to receive new messages which it must send
    /// </summary>
    /// <param name="e"> Event to execute</param>        
    public void OnMessageRecieveEvent(ObserverEvent e)
    
        SendSignalAsync(e.Message.payload);           
    
    # region main working space

    # region CallBack Functions
    /// <summary>
    /// CallBack Function that is called when a connection Receives Some Data
    /// </summary>
    /// <param name="ar"></param>
    private void ReceiveCallback(IAsyncResult ar)
    
        try
        
            String content = String.Empty;
            if (ar != null)
            
                SocketState state = (SocketState)ar.AsyncState;
                if (state != null)
                
                    Socket handler = state.WorkSocket;
                    if (handler != null)
                    
                        int bytesRead = handler.EndReceive(ar);

                        if (bytesRead > 0)
                        
                            StringBuilder Sb = new StringBuilder();
                            Sb.Append(Encoding.Default.GetString(state.Buffer, 0, bytesRead));

                            if (Sb.Length > 1)
                            
                                content = Sb.ToString();

                                foreach (var s in content.Split('Ÿ'))
                                
                                    if (string.Compare(s, 0, "ID", 0, 2) == 0)
                                    
                                        Name = s.Substring(2);
                                        NameIsSet = true;
                                    

                                    if (string.Compare(s, 0, "TG", 0, 2) == 0)
                                    
                                        LinkReplyToTag(s.Substring(2), this.Name);
                                    
                                
                                _state.WorkSocket.BeginReceive(_state.Buffer, 0, SocketState.BufferSize, 0,
                                    new AsyncCallback(ReceiveCallback), _state);
                            
                        
                    
                
            
        
        catch
        
            CloseConnection();
        
    

    /// <summary>
    /// Call Back Function called when data is send though this connection
    /// </summary>
    /// <param name="asyncResult"></param>
    private void SendCallback(IAsyncResult asyncResult)
    
        try
        
            if (asyncResult != null)
            
                Socket handler = (Socket)asyncResult.AsyncState;
                if (handler != null)
                
                    int bytesSent = handler.EndSend(asyncResult);
                    // Signal that all bytes have been sent.
                    _sendDone.Set();
                    if (bytesSent > 0)
                    
                        return;
                    
                
            
        
        catch (Exception e)
                        
            Log.Error("Transmit Failed On Send CallBack");
        
        //Close socket as something went wrong
        CloseConnection();
    
    # endregion CallBack Functions

    /// <summary>
    /// Sends a signal out of the current connection
    /// </summary>
    /// <param name="signal"></param>
    private void SendSignalAsync(Byte[] signal)
    
        try
        
            if (_state != null)
            
                if (_state.WorkSocket != null)
                
                    if (_state.WorkSocket.Connected)
                    
                        try
                        
                            _sendDone.Reset();
                            _state.WorkSocket.BeginSend(signal, 0, signal.Length, 0, new AsyncCallback(SendCallback),
                                _state.WorkSocket);
                            _sendDone.WaitOne(200);
                            return;
                        
                        catch (Exception e)
                        
                            Log.Error("Transmission Failier for IP: " + ((IPEndPoint)_state.WorkSocket.RemoteEndPoint).Address, e);
                        
                    
                
            
            //Close Connection as something went wrong
            CloseConnection();
        
        catch (Exception e)
        
            Log.Error("An Exception has occurred in the SendSignalAsync function", e);
        
         


    /// <summary>
    /// Call this to Close the connection
    /// </summary>
    private void CloseConnection()
    
        try
        
            var ip = "NA";
            try
            
                if (_state != null)
                
                    ip = ((IPEndPoint)_state.WorkSocket.RemoteEndPoint).Address.ToString();
                
            
            catch
            
                //Cannot get the ip address
            
            OutgoingListeningServer.UpdateRecieversHistory(_recieverId, ip, "Disconnected");

            try
            
                if (_state != null)
                
                    if (_state.WorkSocket != null)
                    
                        _state.WorkSocket.Shutdown(SocketShutdown.Both);
                        _state.WorkSocket.Close();
                        //_state.WorkSocket.Dispose();
                        _state.WorkSocket = null;
                        _state = null;
                    
                
            
            catch (Exception e)
            
                _state = null;
                Log.Error("Error while trying to close socket for IP: " + ip, e);
            
            if (_parentConnectionManager != null)
            
                // Remove this connection from the connection list
                _parentConnectionManager.ConnectionRemove(this);
            
        
        catch (Exception e)
        
            Log.Error("A major error occurred in the close connection function, outer try catch was hit", e);
        
      
    # endregion main working space

这是我的线程,它将调用SocketHandler.cs 类中的OnMessageRecieveEvent() 函数。

  private void Main()
    
        Log.Info("Receiver" + ReceiverDb.Id + " Thread Starting");
        // Exponential back off 
        var eb = new ExponentialBackoff();
        try
        
            while (_run)
            
                try
                
                    if (ReceiverOutgoingConnectionManager.HasConnectedClient())
                    
                        //Fetch Latest Item
                        ILogItem logItem;
                        if (_receiverQueue.TryDequeue(out logItem))
                        
                            //Handle the logItem 
                           **calls the OnMessageRecieveEvent() for all my connections.                       
                                    ReceiverOutgoingConnectionManager.SendItemToAllConnections(logItem);  
                            //Reset the exponential back off counter
                            eb.reset();
                        
                        else
                        
                            //Exponential back off thread sleep
                            eb.sleep();
                        
                    
                    else
                    
                        //Exponential back off thread sleep
                        eb.sleep();
                    
                
                catch (Exception e)
                
                    Log.Error("Error occurred in " + ReceiverDb.Id + "receiver mains thread ", e);
                
            
        
        catch (Exception e)
        
            Log.Error(" ** An error has occurred in a receiver holder main thread ** =====================================================================>", e);
        
        Log.Info("Receiver" + ReceiverDb.Id + " Thread Exiting ** ===============================================================================>");
    

我为这么多代码道歉。但我担心它可能不是很明显,所以我发布了所有相关代码。

现在进一步解释我的问题。 如果套接字发生错误。我得到了Transmit Failed On Send CallBack 的分配。这对我来说意味着我没有正确关闭套接字,并且仍然有未完成的回调正在执行。

当我关闭套接字时,我没有办法取消所有未完成的回调吗?

我也确信我发布的代码会有一些建议/问题。我非常感谢您的反馈。

【问题讨论】:

伙计,这个回调太棒了! #just_had_to 取消未执行的操作意味着什么?您是否试图通知调用者由于套接字关闭而取消了他们的操作?在我看来,如果您有一个操作队列,然后您停止通过该队列,则排队的操作将未执行、被忽略、忽略和取消。 【参考方案1】:

我假设这是出于学习目的,因为为其他目的编写自己的网络代码有点……困难

在发送回调中获取异常是很好。这就是例外的原因。但是,您需要识别异常,而不是仅仅抓住毯子Exception 并几乎忽略其中的所有信息。

在适合处理的地方处理您可以处理的异常。

我不会很具体,因为您的代码存在很多问题。但关键的事情之一是忽略正确的套接字处理——当你收到0 字节时,这意味着你应该关闭套接字。那是来自另一边的信号,说“我们完成了”。通过忽略这一点,您正在使用(可能部分)关闭的连接。

请使用一些可以为您提供所需保证(和简单性)的网络库。 WCF、林格伦等等。例如,TCP 不保证您将收到消息的一部分,因此您的消息解析代码是不可靠的。您需要使用一些消息框架,您需要添加适当的错误处理...Socket 不是高级构造,它不能“自动”工作,您需要自己实现所有这些东西。

即使忽略网络代码本身,很明显您只是忽略了异步代码的大部分复杂性。 SendDone 怎么了?要么你想使用异步代码,然后摆脱同步性,要么你想要同步代码,那么为什么要使用异步套接字?

【讨论】:

感谢您的cmets和意见。我想你已经向我指出了一些我真正想要实现的东西。在一天结束时,我想要可靠。这可能不是我在这里所拥有的。我使用异步套接字遵循了许多在线教程。顺便提一下,它所使用的设置更多的是广播环境。我的客户端应用程序没有专门发回和确认。只有 tcp ack。 @zapnologica Internet 上的 C# 示例通常是一团糟。有一些非常好的 C++ 示例,但这当然意味着很多麻烦——而且您仍然必须过滤掉损坏的示例。编写低级网络代码真的很难,你真的希望有人以此为生,并使用他们的库:) 我已经开始制作一些关于网络的示例和教程,但它不会是很快就准备好了,更不用说它需要大量的同行评审和实际测试。网络是困难。毕竟这是一个分布式系统。 看,尽管我很想深入了解网络的来龙去脉。这最终适用于现场生产服务器。从你的建议和我这边的一点逻辑来看,它叫图书馆。这将需要代表我进行大量的代码更改,但这是值得的。您可以建议任何特定的库吗?在这种情况下,我无法控制客户端。你知道这个库吗:codeproject.com/Articles/563627/… @zapnologica 不要害怕使用经过良好测试的技术来完成您的工作。例如,看起来您可以轻松地使用 HTTP 进行通信 - HTTP 在 .NET 中非常易于使用。它实际上会给你一个响应——TCP Send 不会告诉你远程服务器收到了你的数据。 @zapnologica 您无法控制客户端?这很棘手。如果通信协议没有消息帧,你不能只在服务器上添加它。你对沟通了解多少?有没有办法找到任何给定消息的结尾?无论如何,听起来您可能无法为该场景使用任何现成的库。哎哟。

以上是关于如何在 C# 应用程序中取消异步发送套接字的主要内容,如果未能解决你的问题,请参考以下文章

win 10 UAP/UWP(通用应用程序)的异步套接字库?

在 C# 中获取套接字对象的流

提升 asio 取消读取而不取消写入

绑定时 C# 异步 SocketException

如何将数据从 Python 异步套接字服务器发送到子进程?

异步套接字的内存问题并开始发送