实时通讯第二篇
Posted lovemj
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实时通讯第二篇相关的知识,希望对你有一定的参考价值。
说完SignalR 我们来说说基于iocp IO管理 socket~~~~依旧觉得好的话 记得推荐哦
————————————————————————————————————————————————————————————————————————————
基于服务端的Iocp Socket
_________________________________________________________
引用socket这些都不用说
先建立BufferManager类 为了缓冲数据
1 namespace FastDev.IocpSocket 2 { 3 public class BufferManager 4 { 5 int m_numBytes; // 缓冲池控制的字节总数。 6 byte[] m_buffer; // 缓冲区管理器维护的底层字节数组 7 Stack<int> m_freeIndexPool; 8 int m_currentIndex; 9 int m_bufferSize; 10 11 public BufferManager(int totalBytes, int bufferSize) 12 { 13 m_numBytes = totalBytes; 14 m_currentIndex = 0; 15 m_bufferSize = bufferSize; 16 m_freeIndexPool = new Stack<int>(); 17 } 18 19 // 分配缓冲池使用的缓冲区空间 20 public void InitBuffer() 21 { 22 //创建一个大的缓冲区并划分 23 //输出到每个SocketAsyncEventArg对象 24 m_buffer = new byte[m_numBytes]; 25 } 26 27 // 将缓冲区从缓冲池分配给指定的SocketAsyncEventArgs对象 28 // 29 // <returns>true if the buffer was successfully set, else false</returns> 30 public bool SetBuffer(SocketAsyncEventArgs args) 31 { 32 33 if (m_freeIndexPool.Count > 0) 34 { 35 args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize); 36 } 37 else 38 { 39 if ((m_numBytes - m_bufferSize) < m_currentIndex) 40 { 41 return false; 42 } 43 args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize); 44 m_currentIndex += m_bufferSize; 45 } 46 return true; 47 } 48 49 // 从SocketAsyncEventArg 对象中移除缓冲区. 50 // 将缓冲区释放回缓冲池 51 public void FreeBuffer(SocketAsyncEventArgs args) 52 { 53 m_freeIndexPool.Push(args.Offset); 54 args.SetBuffer(null, 0, 0); 55 } 56 } 57 }
SocketEventPool 看名字就知道是为了做什么
namespace FastDev.IocpSocket { public class SocketEventPool { Stack<SocketAsyncEventArgs> m_pool; public SocketEventPool(int capacity) { m_pool = new Stack<SocketAsyncEventArgs>(capacity); } public void Push(SocketAsyncEventArgs item) { if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); } lock (m_pool) { m_pool.Push(item); } } //从池中移除 SocketAsyncEventArgs 实例 // 并返回从池中移除的对象 public SocketAsyncEventArgs Pop() { lock (m_pool) { return m_pool.Pop(); } } //池中的 SocketAsyncEventArgs 实例的数目 public int Count { get { return m_pool.Count; } } public void Clear() { m_pool.Clear(); } } }
AsyncUserToken 客户端连接的对象
namespace FastDev.IocpSocket { public class AsyncUserToken { /// <summary> /// 客户端IP地址 /// </summary> public IPAddress IPAddress { get; set; } /// <summary> /// 远程地址 /// </summary> public EndPoint Remote { get; set; } /// <summary> /// 通信SOKET /// </summary> public Socket Socket { get; set; } /// <summary> /// 连接时间 /// </summary> public DateTime ConnectTime { get; set; } ///// <summary> ///// 所属用户信息 ///// </summary> //public UserInfoModel UserInfo { get; set; } /// <summary> /// 咱这里就储存一个CompanyId /// </summary> public ReceiveMsgModel Model { get; set; } /// <summary> /// 数据缓存区 /// </summary> public List<byte> Buffer { get; set; } public AsyncUserToken() { this.Buffer = new List<byte>(); } } }
SocketManager 这个就是操作的公用方法啦 这里可以使用单例模式 也可以使用静态 不过总感觉静态不太好呢?
namespace FastDev.IocpSocket { public static class SocketManager { private static int m_maxConnectNum; //最大连接数 private static int m_revBufferSize; //最大接收字节数 static BufferManager m_bufferManager; const int opsToAlloc = 2; static Socket listenSocket; //监听Socket static SocketEventPool m_pool; static int m_clientCount; //连接的客户端数量 static Semaphore m_maxNumberAcceptedClients; static List<AsyncUserToken> m_clients; //客户端列表 #region 定义委托 /// <summary> /// 客户端连接数量变化时触发 /// </summary> /// <param name="num">当前增加客户的个数(用户退出时为负数,增加时为正数,一般为1)</param> /// <param name="token">增加用户的信息</param> //public delegate void OnClientNumberChange(int num, AsyncUserToken token); /// <summary> /// 接收到客户端的数据 /// </summary> /// <param name="token">客户端</param> /// <param name="buff">客户端数据</param> public delegate void OnReceiveData(AsyncUserToken token, byte[] buff); #endregion #region 定义事件 /// <summary> /// 客户端连接数量变化事件 /// </summary> //public OnClientNumberChange ClientNumberChange; /// <summary> /// 接收到客户端的数据事件 /// </summary> public static OnReceiveData ReceiveClientData; #endregion #region 定义属性 /// <summary> /// 获取客户端列表 /// </summary> public static List<AsyncUserToken> ClientList { get { return m_clients; } } #endregion /// <summary> /// 构造函数 /// </summary> /// <param name="numConnections">最大连接数</param> /// <param name="receiveBufferSize">缓存区大小</param> static SocketManager() { m_clientCount = 0; m_maxConnectNum = 200; m_revBufferSize = 1200; // 分配缓冲区,以便最大数量的套接字可以有一个未读数,同时向 Socket 写入 m_bufferManager = new BufferManager(1200 * 200 * opsToAlloc, 1200); m_pool = new SocketEventPool(200); m_maxNumberAcceptedClients = new Semaphore(200, 200); } /// <summary> /// 初始化 /// </summary> public static void Init() { // 分配一个大字节缓冲区,所有I/O操作都使用一个。这个防止内存碎片化 m_bufferManager.InitBuffer(); m_clients = new List<AsyncUserToken>(); //SocketAsyncEventArgs 对象的预分配池 SocketAsyncEventArgs readWriteEventArg; for (int i = 0; i < m_maxConnectNum; i++) { readWriteEventArg = new SocketAsyncEventArgs(); readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); readWriteEventArg.UserToken = new AsyncUserToken(); // 将缓冲区中的字节缓冲区分配给 SocketAsyncEventArg 对象 m_bufferManager.SetBuffer(readWriteEventArg); // 向池中添加 SocketAsyncEventArg m_pool.Push(readWriteEventArg); } } /// <summary> /// 启动服务 /// </summary> /// <param name="localEndPoint"></param> public static bool Start(IPEndPoint localEndPoint) { try { m_clients.Clear(); listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); listenSocket.Bind(localEndPoint); // 使用100个连接的侦听后台日志启动服务器 listenSocket.Listen(m_maxConnectNum); // post accepts on the listening socket StartAccept(null); return true; } catch (Exception) { return false; } } /// <summary> /// 停止服务 /// </summary> public static void Stop() { foreach (AsyncUserToken token in m_clients) { try { token.Socket.Shutdown(SocketShutdown.Both); } catch (Exception) { } } try { listenSocket.Shutdown(SocketShutdown.Both); } catch (Exception) { } listenSocket.Close(); int c_count = m_clients.Count; lock (m_clients) { m_clients.Clear(); } //ClientNumberChange?.Invoke(-c_count, null); } public static void CloseClient(AsyncUserToken token) { try { token.Socket.Shutdown(SocketShutdown.Both); } catch (Exception) { } } // 开始接受客户端的连接请求的操作 // // <param name="acceptEventArg">在服务器侦听套接字上发出接受操作时要使用的上下文对象</param> public static void StartAccept(SocketAsyncEventArgs acceptEventArg) { if (acceptEventArg == null) { acceptEventArg = new SocketAsyncEventArgs(); acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed); } else { // 由于上下文对象正在重用,必须清除套接字。 acceptEventArg.AcceptSocket = null; } m_maxNumberAcceptedClients.WaitOne(); if (!listenSocket.AcceptAsync(acceptEventArg)) { ProcessAccept(acceptEventArg); } } // 此方法是与SoCK.Access同步的回调方法操作,并在接受操作完成时调用 static void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e) { ProcessAccept(e); } private static void ProcessAccept(SocketAsyncEventArgs e) { try { Interlocked.Increment(ref m_clientCount); // 获取接受的客户端连接的套接字,并将其放入对象用户令牌 SocketAsyncEventArgs readEventArgs = m_pool.Pop(); AsyncUserToken userToken = (AsyncUserToken)readEventArgs.UserToken; userToken.Socket = e.AcceptSocket; userToken.ConnectTime = DateTime.Now; userToken.Remote = e.AcceptSocket.RemoteEndPoint; userToken.IPAddress = ((IPEndPoint)(e.AcceptSocket.RemoteEndPoint)).Address; lock (m_clients) { m_clients.Add(userToken); } //ClientNumberChange?.Invoke(1, userToken); if (!e.AcceptSocket.ReceiveAsync(readEventArgs)) { ProcessReceive(readEventArgs); } } catch (Exception me) { } // Accept the next connection request if (e.SocketError == SocketError.OperationAborted) return; StartAccept(e); } static void IO_Completed(object sender, SocketAsyncEventArgs e) { // 确定刚刚完成的操作类型并调用关联的处理程序 switch (e.LastOperation) { case SocketAsyncOperation.Receive: ProcessReceive(e); break; case SocketAsyncOperation.Send: ProcessSend(e); break; default: throw new ArgumentException("The last operation completed on the socket was not a receive or send"); } } // 当异步接收操作完成时调用此方法. // 如果远程主机关闭连接,则套接字关闭. // 如果接收到数据,则将数据回传给客户端. // private static void ProcessReceive(SocketAsyncEventArgs e) { try { // 检查远程主机是否关闭了连接 AsyncUserToken token = (AsyncUserToken)e.UserToken; if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) { //读取数据 byte[] data = new byte[e.BytesTransferred]; Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred); lock (token.Buffer) { token.Buffer.AddRange(data); } //注意:你一定会问,这里为什么要用do-while循环? //如果当客户发送大数据流的时候,e.BytesTransferred的大小就会比客户端发送过来的要小, //需要分多次接收.所以收到包的时候,先判断包头的大小.够一个完整的包再处理. //如果客户短时间内发送多个小数据包时, 服务器可能会一次性把他们全收了. //这样如果没有一个循环来控制,那么只会处理第一个包, //剩下的包全部留在token.Buffer中了,只有等下一个数据包过来后,才会放出一个来. do { //判断包的长度 byte[] lenBytes = token.Buffer.GetRange(0, 4).ToArray(); int packageLen = BitConverter.ToInt32(lenBytes, 0); if (packageLen > token.Buffer.Count - 4) { //长度不够时,退出循环,让程序继续接收 break; } //包够长时,则提取出来,交给后面的程序去处理 byte[] rev = token.Buffer.GetRange(4, packageLen).ToArray(); //从数据池中移除这组数据 lock (token.Buffer) { token.Buffer.RemoveRange(0, packageLen + 4); } //这里回调--各自的方法 --我这里做数据库处理 ReceiveClientData = new OnReceiveData(ReceiveMsg); //将数据包交给后台处理,这里你也可以新开个线程来处理.加快速度. ReceiveClientData.Invoke(token, rev); //这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收. //若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了. } while (token.Buffer.Count > 4); //继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明 if (!token.Socket.ReceiveAsync(e)) ProcessReceive(e); } else { CloseClientSocket(e); } } catch (Exception xe) { } } /// <summary> /// 处理客户端消息 /// </summary> /// <param name="token"></param> /// <param name="b"></param> private static void ReceiveMsg(AsyncUserToken token, byte[] b) { string msg = System.Text.Encoding.Default.GetString(b); if (!string.IsNullOrWhiteSpace(msg)) { ReceiveMsgModel model = TypeConvertHepler.JsonToData<ReceiveMsgModel>(msg); if (model != null && model.State != 0) { switch (model.State) { case 1: foreach (var temp in m_clients) { if (temp.Remote == token.Remote)//通讯地址相等 { temp.Model = new ReceiveMsgModel { CompanyId = model.CompanyId }; } } break; case 2: //操作数据库 Go.Travel.NetWork.WcfHelper.TravelHelper.UpdateByWhere_B2cmesremind(new Dictionary<string, object> { { nameof(B2cmesremind.IsRead),1} }, new Dictionary<string, object> { { nameof(B2cmesremind.KID),model.KID } }); break; } } } } // 异步发送操作完成时调用的方法. // 该方法在套接字上发出另一个接收来读取任何附加信息 // 客户端发送的数据 // // <param name="e"></param> private static void ProcessSend(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success) { // 完成对客户端的数据回传 AsyncUserToken token = (AsyncUserToken)e.UserToken; // 读取客户端发送的下一个数据块 bool willRaiseEvent = token.Socket.ReceiveAsync(e); if (!willRaiseEvent) { ProcessReceive(e); } } else { CloseClientSocket(e); } } //关闭客户端 private static void CloseClientSocket(SocketAsyncEventArgs e) { AsyncUserToken token = e.UserToken as AsyncUserToken; lock (m_clients) { m_clients.Remove(token); } //如果有事件,则调用事件,发送客户端数量变化通知 //ClientNumberChange?.Invoke(-1, token); // 关闭与客户端关联的套接字 try { token.Socket.Shutdown(SocketShutdown.Send); } catch (Exception) { } token.Socket.Close(); // 减少计数器,以跟踪连接到服务器的客户端总数 Interlocked.Decrement(ref m_clientCount); m_maxNumberAcceptedClients.Release(); //SocketAsyncEventArg,以便它们可以被另一个客户端重用。 e.UserToken = new AsyncUserToken(); m_pool.Push(e); } /// <summary> /// 对数据进行打包,然后再发送 /// </summary> /// <param name="token"></param> /// <param name="message"></param> /// <returns></returns> public static void SendMessage(AsyncUserToken token, byte[] message) { if (token == null || token.Socket == null || !token.Socket.Connected) return; try { //对要发送的消息,制定简单协议,头4字节指定包的大小,方便客户端接收(协议可以自己定) byte[] buff = new byte[message.Length + 4]; byte[] len = BitConverter.GetBytes(message.Length); Array.Copy(len, buff, 4); Array.Copy(message, 0, buff, 4, message.Length); //token.Socket.Send(buff); //这句也可以发送, 可根据自己的需要来选择 //新建异步发送对象, 发送消息 SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs(); sendArg.UserToken = token; sendArg.SetBuffer(buff, 0, buff.Length); //将数据放置进去. token.Socket.SendAsync(sendArg); } catch (Exception e) { } } /// <summary> /// 给所有在线的用户发送消息,且CompanyId == 传递的值 /// </summary> /// <param name="message"></param> /// <param name="companyId"></param> public static void SendAllUserMsg(byte[] message, string companyId) { try { foreach (var token in m_clients) { if (token == null || token.Socket == null || !token.Socket.Connected || token.Model.CompanyId != companyId) continue; else { //对要发送的消息,制定简单协议,头4字节指定包的大小,方便客户端接收(协议可以自己定) byte[] buff = new byte[message.Length + 4]; byte[] len = BitConverter.GetBytes(message.Length); Array.Copy(len, buff, 4); Array.Copy(message, 0, buff, 4, message.Length); //token.Socket.Send(buff); //这句也可以发送, 可根据自己的需要来选择 //新建异步发送对象, 发送消息 SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs(); sendArg.UserToken = token; sendArg.SetBuffer(buff, 0, buff.Length); //将数据放置进去. token.Socket.SendAsync(sendArg); } } //调用http推送消息 System.Text.Encoding.Default.GetString(message) SendMsgModel model = TypeConvertHepler.JsonToData<SendMsgModel>(Encoding.Default.GetString(message)); if (model != null) { HttpHepler.HttpGet(ConfigurationManager.AppSettings["lyUrlPath"].ToString() + "/SignalrPush/PushMsg/?message=" + model.Guid); } } catch (Exception e) { } } } }
SendAllUserMsg 这个方法是我自己定义的 做了广播处理
那么这里你可以告诉服务端,我要给谁传递消息 AsyncUserToken 就是这个对象里面的咯 你可以根据IP + 端口 找到唯一的客户端
接下来 初始化 然后启动就可以啦
/// <summary> /// 启动Socket /// </summary> private static void StartSocket() { SocketManager.Init(); SocketManager.Start(new IPEndPoint(IPAddress.Parse(TypeConvertHepler.GetLocalIP()), 10001)); }
是不是很nice 拿起来就能用哦
以上是关于实时通讯第二篇的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud教程 | 第二篇: 服务消费者(rest+ribbon)(Finchley版本)