c# SocketAsyncEventArgs 在 ReceiveAsync 处理程序中阻塞代码
Posted
技术标签:
【中文标题】c# SocketAsyncEventArgs 在 ReceiveAsync 处理程序中阻塞代码【英文标题】:c# SocketAsyncEventArgs blocking code inside ReceiveAsync handler 【发布时间】:2014-09-08 16:02:14 【问题描述】:我正在测试以下两种情况,一种有效,另一种无效。 我在两台不同的机器上运行套接字服务器和套接字客户端应用程序
这两个场景都在使用 socketasynceventargs
场景 1(作品) 循环创建 40k 个套接字客户端,等待所有连接建立,然后所有客户端同时向服务器发送消息并接收服务器的响应 10 次(即发送/接收发生 10 次)。
场景 2(不起作用。我收到很多连接拒绝错误) 循环创建 40k 个套接字客户端,并在每个客户端连接后立即向服务器发送/接收相同的 10 条消息,而不是等待 40k 连接建立。
我无法弄清楚为什么我的第二种情况会失败。我知道在场景 1 中,在建立所有 40k 连接之前,服务器并没有做太多事情。但它能够同时与所有客户端进行通信。有什么想法吗??
感谢您的耐心等待。
这是套接字服务器代码
public class SocketServer
private static System.Timers.Timer MonitorTimer = new System.Timers.Timer();
public static SocketServerMonitor socket_monitor = new SocketServerMonitor();
private int m_numConnections;
private int m_receiveBufferSize;
public static BufferManager m_bufferManager;
Socket listenSocket;
public static SocketAsyncEventArgsPool m_readWritePool;
public static int m_numConnectedSockets;
private int cnt = 0;
public static int Closecalled=0;
public SocketServer(int numConnections, int receiveBufferSize)
m_numConnectedSockets = 0;
m_numConnections = numConnections;
m_receiveBufferSize = receiveBufferSize;
m_bufferManager = new BufferManager(receiveBufferSize * numConnections ,
receiveBufferSize);
m_readWritePool = new SocketAsyncEventArgsPool(numConnections);
public void Init()
MonitorTimer.Interval = 30000;
MonitorTimer.Start();
MonitorTimer.Elapsed += new System.Timers.ElapsedEventHandler(socket_monitor.Log);
m_bufferManager.InitBuffer();
SocketAsyncEventArgs readWriteEventArg;
for (int i = 0; i < m_numConnections; i++)
readWriteEventArg = new SocketAsyncEventArgs();
m_readWritePool.Push(readWriteEventArg);
public void Start(IPEndPoint localEndPoint)
listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(localEndPoint);
listenSocket.Listen(1000);
StartAccept(null);
public void Stop()
if (listenSocket == null)
return;
listenSocket.Close();
listenSocket = null;
Thread.Sleep(15000);
private void StartAccept(SocketAsyncEventArgs acceptEventArg)
if (acceptEventArg == null)
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
else
// socket must be cleared since the context object is being reused
acceptEventArg.AcceptSocket = null;
try
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
ProcessAccept(acceptEventArg);
catch (Exception e)
void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
ProcessAccept(e);
private void ProcessAccept(SocketAsyncEventArgs e)
Interlocked.Increment(ref m_numConnectedSockets);
socket_monitor.IncSocketsConnected();
SocketAsyncEventArgs readEventArgs = m_readWritePool.Pop();
m_bufferManager.SetBuffer(readEventArgs);
readEventArgs.UserToken = new AsyncUserToken id = cnt++, StarTime = DateTime.Now ;
readEventArgs.AcceptSocket = e.AcceptSocket;
SocketHandler handler=new SocketHandler(readEventArgs);
StartAccept(e);
class SocketHandler
private SocketAsyncEventArgs _socketEventArgs;
public SocketHandler(SocketAsyncEventArgs socketAsyncEventArgs)
_socketEventArgs = socketAsyncEventArgs;
_socketEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
StartReceive(_socketEventArgs);
private void StartReceive(SocketAsyncEventArgs receiveSendEventArgs)
bool willRaiseEvent = receiveSendEventArgs.AcceptSocket.ReceiveAsync(receiveSendEventArgs);
if (!willRaiseEvent)
ProcessReceive(receiveSendEventArgs);
private void ProcessReceive(SocketAsyncEventArgs e)
// check if the remote host closed the connection
AsyncUserToken token = (AsyncUserToken)e.UserToken;
//token.StarTime = DateTime.Now;
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
// process the data here
//reply to client
byte[] AckData1 = BitConverter.GetBytes(1);
SendData(AckData1, 0, AckData1.Length, e);
StartReceive(e);
else
CloseClientSocket(e);
private void IO_Completed(object sender, SocketAsyncEventArgs e)
// determine which type of operation just completed and call the associated handler
switch (e.LastOperation)
case SocketAsyncOperation.Receive:
ProcessReceive(e);
break;
case SocketAsyncOperation.Send:
ProcessSend(e);
break;
default:
throw new ArgumentException("The last operation completed on the socket was not a receive or send");
private void CloseClientSocket(SocketAsyncEventArgs e)
AsyncUserToken token = e.UserToken as AsyncUserToken;
// close the socket associated with the client
try
e.AcceptSocket.Shutdown(SocketShutdown.Send);
catch (Exception ex)
e.AcceptSocket.Close();
Interlocked.Decrement(ref SocketServer.m_numConnectedSockets);
SocketServer.socket_monitor.DecSocketsConnected();
SocketServer.m_bufferManager.FreeBuffer(e);
e.Completed -= new EventHandler<SocketAsyncEventArgs>(IO_Completed);
SocketServer.m_readWritePool.Push(e);
public void SendData(Byte[] data, Int32 offset, Int32 count, SocketAsyncEventArgs args)
try
Socket socket = args.AcceptSocket;
if (socket.Connected)
var i = socket.Send(data, offset, count, SocketFlags.None);
catch (Exception Ex)
这是在connectcallback方法中抛出错误的客户端代码
// State object for receiving data from remote device.
public class StateObject
// Client socket.
public Socket workSocket = null;
// Size of receive buffer.
public const int BufferSize = 256;
// Receive buffer.
public byte[] buffer = new byte[BufferSize];
// Received data string.
public StringBuilder sb = new StringBuilder();
public int count = 0;
public class AsynchronousClient
// The port number for the remote device.
private const int port = 11000;
private static int closecalled = 0;
private static bool wait = true;
// ManualResetEvent instances signal completion.
private static ManualResetEvent connectDone =
new ManualResetEvent(false);
private static ManualResetEvent sendDone =
new ManualResetEvent(false);
private static ManualResetEvent receiveDone =
new ManualResetEvent(false);
// The response from the remote device.
private static String response = String.Empty;
private static void StartClient(Socket client, IPEndPoint remoteEP)
// Connect to a remote device.
try
// Connect to the remote endpoint.
client.BeginConnect(remoteEP,
new AsyncCallback(ConnectCallback), new StateObject workSocket = client );
catch (Exception e)
Console.WriteLine(e.ToString());
private static void ConnectCallback(IAsyncResult ar)
try
// Retrieve the socket from the state object.
StateObject state = (StateObject)ar.AsyncState;
var client = state.workSocket;
// Complete the connection.
client.EndConnect(ar);
var data = "AA5500C08308353816050322462F01020102191552E7D3FA52E7D3FB1FF85BF1FE9F201000004AB80000000500060800001EFFB72F0D00002973620000800000FFFFFFFF00009D6D00003278002EE16D0000018500000000000000000000003A0000000100000000828C80661FF8B436FE9EA9FC000000120000000700000000000000000000000400000000000000000000000000000000000000000000281E0000327800000000000000000000000000AF967D00000AEA000000000000000000000000";
Send(state, data);
catch (Exception e)
Console.WriteLine(e.ToString());
private static void Receive(StateObject state)
try
Socket client = state.workSocket;
// Begin receiving the data from the remote device.
client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
new AsyncCallback(ReceiveCallback), state);
catch (Exception e)
Console.WriteLine(e.ToString());
private static void ReceiveCallback(IAsyncResult ar)
try
StateObject state = (StateObject)ar.AsyncState;
Socket client = state.workSocket;
// Read data from the remote device.
int bytesRead = client.EndReceive(ar);
//if (wait)
//
// connectDone.WaitOne();
//
if (bytesRead > 0)
state.count = state.count + 1;
byte[] b = new byte[bytesRead];
Array.Copy(state.buffer, b, 1);
if (b[0] == 1)
if (state.count < 10)
var data = "AA5500C08308353816050322462F01020102191552E7D3FA52E7D3FB1FF85BF1FE9F201000004AB80000000500060800001EFFB72F0D00002973620000800000FFFFFFFF00009D6D00003278002EE16D0000018500000000000000000000003A0000000100000000828C80661FF8B436FE9EA9FC000000120000000700000000000000000000000400000000000000000000000000000000000000000000281E0000327800000000000000000000000000AF967D00000AEA000000000000000000000000";
Send(state, data);
else
Interlocked.Increment(ref closecalled);
Console.WriteLine("closecalled:-" + closecalled + " at " + DateTime.Now);
client.Close();
else
// Get the rest of the data.
client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
new AsyncCallback(ReceiveCallback), state);
else
client.Close();
catch (Exception e)
Console.WriteLine(e.ToString());
private static void Send(StateObject state, String data)
try
Socket client = state.workSocket;
var hexlen = data.Length;
byte[] byteData = new byte[hexlen / 2];
int[] hexarray = new int[hexlen / 2];
int i = 0;
int k = 0;
//create the byte array
while (i < data.Length / 2)
string first = data[i].ToString();
i++;
string second = data[i].ToString();
string x = first + second;
byteData[k] = (byte)Convert.ToInt32(x, 16);
i++;
k++;
// Begin sending the data to the remote device.
client.BeginSend(byteData, 0, byteData.Length, 0,
new AsyncCallback(SendCallback), state);
catch (Exception e)
Console.WriteLine(e.ToString());
private static void SendCallback(IAsyncResult ar)
try
// Retrieve the socket from the state object.
StateObject state = (StateObject)ar.AsyncState;
Socket client = state.workSocket;
// Complete sending the data to the remote device.
int bytesSent = client.EndSend(ar);
Receive(state);
catch (Exception e)
Console.WriteLine(e.ToString());
public static int Main(String[] args)
Start();
Console.ReadLine();
return 0;
private static void Start()
IPAddress ipaddress = IPAddress.Parse("10.20.2.152");
IPEndPoint remoteEP = new IPEndPoint(ipaddress, port);
for (int i = 0; i < 40000; i++)
Thread.Sleep(1);
// Create a TCP/IP socket.
try
Socket client = new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
StartClient(client, remoteEP);
catch (Exception e)
Console.WriteLine(e.ToString());
if (i == 39999)
Console.WriteLine("made all conns at " + DateTime.Now);
【问题讨论】:
两种场景下的服务器代码是否相同?如果是这样,则可能存在错误。在这种情况下,你能给我们展示一些它的代码吗? 听起来您已经耗尽了操作系统积压的传入连接。发布一些代码和完整的异常。 我在上面添加了部分代码。我得到的错误是“无法建立连接,因为目标机器主动拒绝它” 启动连接的代码在哪里?指定更高的积压(例如 40k)。 @usr 我已经添加了客户端代码。我尝试了 40k 作为积压但没有运气。你能检查一下客户端代码吗? 【参考方案1】:我会使用线性队列来接受传入连接。像这样的:
public async Task Accept40KClients()
for (int i = 0; i < 40000; i++)
// Await this here -------v
bool willRaiseEvent = await listenSocket.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
ProcessAccept(acceptEventArg);
如果这还不够快,也许你可以一次等待 10 次,但我认为这已经足够了......不过我可能错了。
【讨论】:
以上是关于c# SocketAsyncEventArgs 在 ReceiveAsync 处理程序中阻塞代码的主要内容,如果未能解决你的问题,请参考以下文章
C#高性能大容量SOCKET并发:SocketAsyncEventArgs封装
SocketAsyncEventArgs 和缓冲,而消息是在部分