如何在 C# 应用程序中取消异步发送套接字
Posted
技术标签:
【中文标题】如何在 C# 应用程序中取消异步发送套接字【英文标题】:How to cancel Async Send Socket in C# application 【发布时间】:2015-02-11 07:07:59 【问题描述】:我有一个应用程序,它只是从队列中提取项目,然后尝试通过网络套接字异步发送它们。
当出现问题或客户端中止主机套接字时,我遇到了一些问题。
这是我的一些代码:我认为它可能比我的话解释得更好:
这是我的SocketState.cs
类,其中包含套接字和相关信息:
public class SocketState
public const int BufferSize = 256;
public Socket WorkSocket get; set;
public byte[] Buffer get; set;
/// <summary>
/// Constructor receiving a socket
/// </summary>
/// <param name="socket"></param>
public SocketState(Socket socket)
WorkSocket = socket;
Buffer = new byte[BufferSize];
这是我的SocketHandler.cs
类,它控制着大多数套接字操作:
public class SocketHandler : IObserver
# region Class Variables
private static readonly log4net.ILog Log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
private SocketState _state;
private OutgoingConnectionManager _parentConnectionManager;
private int _recieverId;
private readonly ManualResetEvent _sendDone = new ManualResetEvent(false);
public volatile bool NameIsSet = false;
private ManualResetEvent _receiveDone = new ManualResetEvent(false);
public String Name;
public readonly Guid HandlerId;
# endregion Class Variables
/// <summary>
/// Constructor
/// </summary>
public SocketHandler(SocketState state)
HandlerId = Guid.NewGuid();
_state = state;
_state.WorkSocket.BeginReceive(_state.Buffer, 0, SocketState.BufferSize, 0, new AsyncCallback(ReceiveCallback), this._state);
/// <summary>
/// Set the receiver id for this socket.
/// </summary>
public void SetReceiverId(int receiverId)
_recieverId = receiverId;
/// <summary>
/// Stops / closes a connection
/// </summary>
public void Stop()
CloseConnection();
/// <summary>
/// Used to set this connections parent connection handler
/// </summary>
/// <param name="conMan"></param>
public void SetConnectionManager(OutgoingConnectionManager conMan)
_parentConnectionManager = conMan;
/// <summary>
/// Returns if the socket is connected or not
/// </summary>
/// <returns></returns>
public bool IsConnected()
return _state.WorkSocket.Connected;
/// <summary>
/// Public Class that implements the observer interface , this function provides a portal to receive new messages which it must send
/// </summary>
/// <param name="e"> Event to execute</param>
public void OnMessageRecieveEvent(ObserverEvent e)
SendSignalAsync(e.Message.payload);
# region main working space
# region CallBack Functions
/// <summary>
/// CallBack Function that is called when a connection Receives Some Data
/// </summary>
/// <param name="ar"></param>
private void ReceiveCallback(IAsyncResult ar)
try
String content = String.Empty;
if (ar != null)
SocketState state = (SocketState)ar.AsyncState;
if (state != null)
Socket handler = state.WorkSocket;
if (handler != null)
int bytesRead = handler.EndReceive(ar);
if (bytesRead > 0)
StringBuilder Sb = new StringBuilder();
Sb.Append(Encoding.Default.GetString(state.Buffer, 0, bytesRead));
if (Sb.Length > 1)
content = Sb.ToString();
foreach (var s in content.Split('Ÿ'))
if (string.Compare(s, 0, "ID", 0, 2) == 0)
Name = s.Substring(2);
NameIsSet = true;
if (string.Compare(s, 0, "TG", 0, 2) == 0)
LinkReplyToTag(s.Substring(2), this.Name);
_state.WorkSocket.BeginReceive(_state.Buffer, 0, SocketState.BufferSize, 0,
new AsyncCallback(ReceiveCallback), _state);
catch
CloseConnection();
/// <summary>
/// Call Back Function called when data is send though this connection
/// </summary>
/// <param name="asyncResult"></param>
private void SendCallback(IAsyncResult asyncResult)
try
if (asyncResult != null)
Socket handler = (Socket)asyncResult.AsyncState;
if (handler != null)
int bytesSent = handler.EndSend(asyncResult);
// Signal that all bytes have been sent.
_sendDone.Set();
if (bytesSent > 0)
return;
catch (Exception e)
Log.Error("Transmit Failed On Send CallBack");
//Close socket as something went wrong
CloseConnection();
# endregion CallBack Functions
/// <summary>
/// Sends a signal out of the current connection
/// </summary>
/// <param name="signal"></param>
private void SendSignalAsync(Byte[] signal)
try
if (_state != null)
if (_state.WorkSocket != null)
if (_state.WorkSocket.Connected)
try
_sendDone.Reset();
_state.WorkSocket.BeginSend(signal, 0, signal.Length, 0, new AsyncCallback(SendCallback),
_state.WorkSocket);
_sendDone.WaitOne(200);
return;
catch (Exception e)
Log.Error("Transmission Failier for IP: " + ((IPEndPoint)_state.WorkSocket.RemoteEndPoint).Address, e);
//Close Connection as something went wrong
CloseConnection();
catch (Exception e)
Log.Error("An Exception has occurred in the SendSignalAsync function", e);
/// <summary>
/// Call this to Close the connection
/// </summary>
private void CloseConnection()
try
var ip = "NA";
try
if (_state != null)
ip = ((IPEndPoint)_state.WorkSocket.RemoteEndPoint).Address.ToString();
catch
//Cannot get the ip address
OutgoingListeningServer.UpdateRecieversHistory(_recieverId, ip, "Disconnected");
try
if (_state != null)
if (_state.WorkSocket != null)
_state.WorkSocket.Shutdown(SocketShutdown.Both);
_state.WorkSocket.Close();
//_state.WorkSocket.Dispose();
_state.WorkSocket = null;
_state = null;
catch (Exception e)
_state = null;
Log.Error("Error while trying to close socket for IP: " + ip, e);
if (_parentConnectionManager != null)
// Remove this connection from the connection list
_parentConnectionManager.ConnectionRemove(this);
catch (Exception e)
Log.Error("A major error occurred in the close connection function, outer try catch was hit", e);
# endregion main working space
这是我的线程,它将调用SocketHandler.cs
类中的OnMessageRecieveEvent()
函数。
private void Main()
Log.Info("Receiver" + ReceiverDb.Id + " Thread Starting");
// Exponential back off
var eb = new ExponentialBackoff();
try
while (_run)
try
if (ReceiverOutgoingConnectionManager.HasConnectedClient())
//Fetch Latest Item
ILogItem logItem;
if (_receiverQueue.TryDequeue(out logItem))
//Handle the logItem
**calls the OnMessageRecieveEvent() for all my connections.
ReceiverOutgoingConnectionManager.SendItemToAllConnections(logItem);
//Reset the exponential back off counter
eb.reset();
else
//Exponential back off thread sleep
eb.sleep();
else
//Exponential back off thread sleep
eb.sleep();
catch (Exception e)
Log.Error("Error occurred in " + ReceiverDb.Id + "receiver mains thread ", e);
catch (Exception e)
Log.Error(" ** An error has occurred in a receiver holder main thread ** =====================================================================>", e);
Log.Info("Receiver" + ReceiverDb.Id + " Thread Exiting ** ===============================================================================>");
我为这么多代码道歉。但我担心它可能不是很明显,所以我发布了所有相关代码。
现在进一步解释我的问题。
如果套接字发生错误。我得到了Transmit Failed On Send CallBack
的分配。这对我来说意味着我没有正确关闭套接字,并且仍然有未完成的回调正在执行。
当我关闭套接字时,我没有办法取消所有未完成的回调吗?
我也确信我发布的代码会有一些建议/问题。我非常感谢您的反馈。
【问题讨论】:
伙计,这个回调太棒了! #just_had_to 取消未执行的操作意味着什么?您是否试图通知调用者由于套接字关闭而取消了他们的操作?在我看来,如果您有一个操作队列,然后您停止通过该队列,则排队的操作将未执行、被忽略、忽略和取消。 【参考方案1】:我假设这是出于学习目的,因为为其他目的编写自己的网络代码有点……困难。
在发送回调中获取异常是很好。这就是例外的原因。但是,您需要识别异常,而不是仅仅抓住毯子Exception
并几乎忽略其中的所有信息。
在适合处理的地方处理您可以处理的异常。
我不会很具体,因为您的代码存在很多问题。但关键的事情之一是忽略正确的套接字处理——当你收到0
字节时,这意味着你应该关闭套接字。那是来自另一边的信号,说“我们完成了”。通过忽略这一点,您正在使用(可能部分)关闭的连接。
请使用一些可以为您提供所需保证(和简单性)的网络库。 WCF、林格伦等等。例如,TCP 不保证您将收到消息的一部分,因此您的消息解析代码是不可靠的。您需要使用一些消息框架,您需要添加适当的错误处理...Socket
不是高级构造,它不能“自动”工作,您需要自己实现所有这些东西。
即使忽略网络代码本身,很明显您只是忽略了异步代码的大部分复杂性。 SendDone
怎么了?要么你想使用异步代码,然后摆脱同步性,要么你想要同步代码,那么为什么要使用异步套接字?
【讨论】:
感谢您的cmets和意见。我想你已经向我指出了一些我真正想要实现的东西。在一天结束时,我想要可靠。这可能不是我在这里所拥有的。我使用异步套接字遵循了许多在线教程。顺便提一下,它所使用的设置更多的是广播环境。我的客户端应用程序没有专门发回和确认。只有 tcp ack。 @zapnologica Internet 上的 C# 示例通常是一团糟。有一些非常好的 C++ 示例,但这当然意味着很多麻烦——而且您仍然必须过滤掉损坏的示例。编写低级网络代码真的很难,你真的希望有人以此为生,并使用他们的库:) 我已经开始制作一些关于网络的示例和教程,但它不会是很快就准备好了,更不用说它需要大量的同行评审和实际测试。网络是困难。毕竟这是一个分布式系统。 看,尽管我很想深入了解网络的来龙去脉。这最终适用于现场生产服务器。从你的建议和我这边的一点逻辑来看,它叫图书馆。这将需要代表我进行大量的代码更改,但这是值得的。您可以建议任何特定的库吗?在这种情况下,我无法控制客户端。你知道这个库吗:codeproject.com/Articles/563627/… @zapnologica 不要害怕使用经过良好测试的技术来完成您的工作。例如,看起来您可以轻松地使用 HTTP 进行通信 - HTTP 在 .NET 中非常易于使用。它实际上会给你一个响应——TCPSend
不会告诉你远程服务器收到了你的数据。
@zapnologica 您无法控制客户端?这很棘手。如果通信协议没有消息帧,你不能只在服务器上添加它。你对沟通了解多少?有没有办法找到任何给定消息的结尾?无论如何,听起来您可能无法为该场景使用任何现成的库。哎哟。以上是关于如何在 C# 应用程序中取消异步发送套接字的主要内容,如果未能解决你的问题,请参考以下文章