实时通讯第二篇

Posted lovemj

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实时通讯第二篇相关的知识,希望对你有一定的参考价值。

说完SignalR  我们来说说基于iocp IO管理 socket~~~~依旧觉得好的话  记得推荐哦
————————————————————————————————————————————————————————————————————————————

基于服务端的Iocp Socket
_________________________________________________________

引用socket这些都不用说

先建立BufferManager类  为了缓冲数据

 1 namespace FastDev.IocpSocket
 2 {
 3     public class BufferManager
 4     {
 5         int m_numBytes;                 // 缓冲池控制的字节总数。 
 6         byte[] m_buffer;                // 缓冲区管理器维护的底层字节数组 
 7         Stack<int> m_freeIndexPool;
 8         int m_currentIndex;
 9         int m_bufferSize;
10 
11         public BufferManager(int totalBytes, int bufferSize)
12         {
13             m_numBytes = totalBytes;
14             m_currentIndex = 0;
15             m_bufferSize = bufferSize;
16             m_freeIndexPool = new Stack<int>();
17         }
18 
19         // 分配缓冲池使用的缓冲区空间  
20         public void InitBuffer()
21         {
22             //创建一个大的缓冲区并划分   
23             //输出到每个SocketAsyncEventArg对象
24             m_buffer = new byte[m_numBytes];
25         }
26 
27         // 将缓冲区从缓冲池分配给指定的SocketAsyncEventArgs对象 
28         //  
29         // <returns>true if the buffer was successfully set, else false</returns>  
30         public bool SetBuffer(SocketAsyncEventArgs args)
31         {
32 
33             if (m_freeIndexPool.Count > 0)
34             {
35                 args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);
36             }
37             else
38             {
39                 if ((m_numBytes - m_bufferSize) < m_currentIndex)
40                 {
41                     return false;
42                 }
43                 args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);
44                 m_currentIndex += m_bufferSize;
45             }
46             return true;
47         }
48 
49         // 从SocketAsyncEventArg 对象中移除缓冲区.    
50         // 将缓冲区释放回缓冲池  
51         public void FreeBuffer(SocketAsyncEventArgs args)
52         {
53             m_freeIndexPool.Push(args.Offset);
54             args.SetBuffer(null, 0, 0);
55         }
56     }
57 }

SocketEventPool 看名字就知道是为了做什么

namespace FastDev.IocpSocket
{
    public class SocketEventPool
    {
        Stack<SocketAsyncEventArgs> m_pool;


        public SocketEventPool(int capacity)
        {
            m_pool = new Stack<SocketAsyncEventArgs>(capacity);
        }

        public void Push(SocketAsyncEventArgs item)
        {
            if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); }
            lock (m_pool)
            {
                m_pool.Push(item);
            }
        }

        //从池中移除 SocketAsyncEventArgs 实例 
        // 并返回从池中移除的对象  
        public SocketAsyncEventArgs Pop()
        {
            lock (m_pool)
            {
                return m_pool.Pop();
            }
        }

        //池中的 SocketAsyncEventArgs 实例的数目 
        public int Count
        {
            get { return m_pool.Count; }
        }

        public void Clear()
        {
            m_pool.Clear();
        }
    }
}

  AsyncUserToken 客户端连接的对象

namespace FastDev.IocpSocket
{
    public class AsyncUserToken
    {
        /// <summary>  
        /// 客户端IP地址  
        /// </summary>  
        public IPAddress IPAddress { get; set; }

        /// <summary>  
        /// 远程地址  
        /// </summary>  
        public EndPoint Remote { get; set; }

        /// <summary>  
        /// 通信SOKET  
        /// </summary>  
        public Socket Socket { get; set; }

        /// <summary>  
        /// 连接时间  
        /// </summary>  
        public DateTime ConnectTime { get; set; }

        ///// <summary>  
        ///// 所属用户信息  
        ///// </summary>  
        //public UserInfoModel UserInfo { get; set; }
        /// <summary>  
        /// 咱这里就储存一个CompanyId
        /// </summary>  
        public ReceiveMsgModel Model { get; set; }

        /// <summary>  
        /// 数据缓存区  
        /// </summary>  
        public List<byte> Buffer { get; set; }


        public AsyncUserToken()
        {
            this.Buffer = new List<byte>();
        }
    }
}

  SocketManager 这个就是操作的公用方法啦 这里可以使用单例模式  也可以使用静态  不过总感觉静态不太好呢?

namespace FastDev.IocpSocket
{
    public static class SocketManager
    {

        private static int m_maxConnectNum;    //最大连接数  
        private static int m_revBufferSize;    //最大接收字节数  
        static BufferManager m_bufferManager;
        const int opsToAlloc = 2;
        static Socket listenSocket;            //监听Socket  
        static SocketEventPool m_pool;
        static int m_clientCount;              //连接的客户端数量  
        static Semaphore m_maxNumberAcceptedClients;

        static List<AsyncUserToken> m_clients; //客户端列表  

        #region 定义委托  

        /// <summary>  
        /// 客户端连接数量变化时触发  
        /// </summary>  
        /// <param name="num">当前增加客户的个数(用户退出时为负数,增加时为正数,一般为1)</param>  
        /// <param name="token">增加用户的信息</param>  
        //public delegate void OnClientNumberChange(int num, AsyncUserToken token);

        /// <summary>  
        /// 接收到客户端的数据  
        /// </summary>  
        /// <param name="token">客户端</param>  
        /// <param name="buff">客户端数据</param>  
        public delegate void OnReceiveData(AsyncUserToken token, byte[] buff);

        #endregion

        #region 定义事件  
        /// <summary>  
        /// 客户端连接数量变化事件  
        /// </summary>  
        //public OnClientNumberChange ClientNumberChange;

        /// <summary>  
        /// 接收到客户端的数据事件  
        /// </summary>  
        public static OnReceiveData ReceiveClientData;
        #endregion

        #region 定义属性  

        /// <summary>  
        /// 获取客户端列表  
        /// </summary>  
        public static List<AsyncUserToken> ClientList { get { return m_clients; } }

        #endregion

        /// <summary>  
        /// 构造函数  
        /// </summary>  
        /// <param name="numConnections">最大连接数</param>  
        /// <param name="receiveBufferSize">缓存区大小</param>  
        static SocketManager()
        {
            m_clientCount = 0;
            m_maxConnectNum = 200;
            m_revBufferSize = 1200;
            // 分配缓冲区,以便最大数量的套接字可以有一个未读数,同时向 Socket 写入    
            m_bufferManager = new BufferManager(1200 * 200 * opsToAlloc, 1200);

            m_pool = new SocketEventPool(200);
            m_maxNumberAcceptedClients = new Semaphore(200, 200);
        }

        /// <summary>  
        /// 初始化  
        /// </summary>  
        public static void Init()
        {
            // 分配一个大字节缓冲区,所有I/O操作都使用一个。这个防止内存碎片化
            m_bufferManager.InitBuffer();
            m_clients = new List<AsyncUserToken>();
            //SocketAsyncEventArgs 对象的预分配池
            SocketAsyncEventArgs readWriteEventArg;

            for (int i = 0; i < m_maxConnectNum; i++)
            {
                readWriteEventArg = new SocketAsyncEventArgs();
                readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
                readWriteEventArg.UserToken = new AsyncUserToken();

                // 将缓冲区中的字节缓冲区分配给 SocketAsyncEventArg 对象  
                m_bufferManager.SetBuffer(readWriteEventArg);
                // 向池中添加 SocketAsyncEventArg
                m_pool.Push(readWriteEventArg);
            }
        }


        /// <summary>  
        /// 启动服务  
        /// </summary>  
        /// <param name="localEndPoint"></param>  
        public static bool Start(IPEndPoint localEndPoint)
        {
            try
            {
                m_clients.Clear();
                listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                listenSocket.Bind(localEndPoint);
                // 使用100个连接的侦听后台日志启动服务器  
                listenSocket.Listen(m_maxConnectNum);
                // post accepts on the listening socket  
                StartAccept(null);
                return true;
            }
            catch (Exception)
            {
                return false;
            }
        }

        /// <summary>  
        /// 停止服务  
        /// </summary>  
        public static void Stop()
        {
            foreach (AsyncUserToken token in m_clients)
            {
                try
                {
                    token.Socket.Shutdown(SocketShutdown.Both);
                }
                catch (Exception) { }
            }
            try
            {
                listenSocket.Shutdown(SocketShutdown.Both);
            }
            catch (Exception) { }

            listenSocket.Close();
            int c_count = m_clients.Count;
            lock (m_clients) { m_clients.Clear(); }

            //ClientNumberChange?.Invoke(-c_count, null);
        }


        public static void CloseClient(AsyncUserToken token)
        {
            try
            {
                token.Socket.Shutdown(SocketShutdown.Both);
            }
            catch (Exception) { }
        }


        // 开始接受客户端的连接请求的操作   
        //  
        // <param name="acceptEventArg">在服务器侦听套接字上发出接受操作时要使用的上下文对象</param>  
        public static void StartAccept(SocketAsyncEventArgs acceptEventArg)
        {
            if (acceptEventArg == null)
            {
                acceptEventArg = new SocketAsyncEventArgs();
                acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
            }
            else
            {
                // 由于上下文对象正在重用,必须清除套接字。  
                acceptEventArg.AcceptSocket = null;
            }

            m_maxNumberAcceptedClients.WaitOne();
            if (!listenSocket.AcceptAsync(acceptEventArg))
            {
                ProcessAccept(acceptEventArg);
            }
        }

        // 此方法是与SoCK.Access同步的回调方法操作,并在接受操作完成时调用  

        static void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
        {
            ProcessAccept(e);
        }

        private static void ProcessAccept(SocketAsyncEventArgs e)
        {
            try
            {
                Interlocked.Increment(ref m_clientCount);
                // 获取接受的客户端连接的套接字,并将其放入对象用户令牌
                SocketAsyncEventArgs readEventArgs = m_pool.Pop();
                AsyncUserToken userToken = (AsyncUserToken)readEventArgs.UserToken;
                userToken.Socket = e.AcceptSocket;
                userToken.ConnectTime = DateTime.Now;
                userToken.Remote = e.AcceptSocket.RemoteEndPoint;

                userToken.IPAddress = ((IPEndPoint)(e.AcceptSocket.RemoteEndPoint)).Address;

                lock (m_clients) { m_clients.Add(userToken); }

                //ClientNumberChange?.Invoke(1, userToken);
                if (!e.AcceptSocket.ReceiveAsync(readEventArgs))
                {
                    ProcessReceive(readEventArgs);
                }
            }
            catch (Exception me)
            {

            }

            // Accept the next connection request  
            if (e.SocketError == SocketError.OperationAborted) return;
            StartAccept(e);
        }


        static void IO_Completed(object sender, SocketAsyncEventArgs e)
        {
            // 确定刚刚完成的操作类型并调用关联的处理程序  
            switch (e.LastOperation)
            {
                case SocketAsyncOperation.Receive:
                    ProcessReceive(e);
                    break;
                case SocketAsyncOperation.Send:
                    ProcessSend(e);
                    break;
                default:
                    throw new ArgumentException("The last operation completed on the socket was not a receive or send");
            }

        }


        // 当异步接收操作完成时调用此方法.   
        // 如果远程主机关闭连接,则套接字关闭.    
        // 如果接收到数据,则将数据回传给客户端.  
        //  
        private static void ProcessReceive(SocketAsyncEventArgs e)
        {
            try
            {
                // 检查远程主机是否关闭了连接  
                AsyncUserToken token = (AsyncUserToken)e.UserToken;
                if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
                {
                    //读取数据  
                    byte[] data = new byte[e.BytesTransferred];
                    Array.Copy(e.Buffer, e.Offset, data, 0, e.BytesTransferred);
                    lock (token.Buffer)
                    {
                        token.Buffer.AddRange(data);
                    }
                    //注意:你一定会问,这里为什么要用do-while循环?   
                    //如果当客户发送大数据流的时候,e.BytesTransferred的大小就会比客户端发送过来的要小,  
                    //需要分多次接收.所以收到包的时候,先判断包头的大小.够一个完整的包再处理.  
                    //如果客户短时间内发送多个小数据包时, 服务器可能会一次性把他们全收了.  
                    //这样如果没有一个循环来控制,那么只会处理第一个包,  
                    //剩下的包全部留在token.Buffer中了,只有等下一个数据包过来后,才会放出一个来.  
                    do
                    {
                        //判断包的长度  
                        byte[] lenBytes = token.Buffer.GetRange(0, 4).ToArray();
                        int packageLen = BitConverter.ToInt32(lenBytes, 0);
                        if (packageLen > token.Buffer.Count - 4)
                        {   //长度不够时,退出循环,让程序继续接收  
                            break;
                        }

                        //包够长时,则提取出来,交给后面的程序去处理  
                        byte[] rev = token.Buffer.GetRange(4, packageLen).ToArray();
                        //从数据池中移除这组数据  
                        lock (token.Buffer)
                        {
                            token.Buffer.RemoveRange(0, packageLen + 4);
                        }
                        //这里回调--各自的方法 --我这里做数据库处理
                        ReceiveClientData = new OnReceiveData(ReceiveMsg);
                        //将数据包交给后台处理,这里你也可以新开个线程来处理.加快速度.  
                        ReceiveClientData.Invoke(token, rev);
                        //这里API处理完后,并没有返回结果,当然结果是要返回的,却不是在这里, 这里的代码只管接收.  
                        //若要返回结果,可在API处理中调用此类对象的SendMessage方法,统一打包发送.不要被微软的示例给迷惑了.  
                    } while (token.Buffer.Count > 4);

                    //继续接收. 为什么要这么写,请看Socket.ReceiveAsync方法的说明  
                    if (!token.Socket.ReceiveAsync(e))
                        ProcessReceive(e);
                }
                else
                {
                    CloseClientSocket(e);
                }
            }
            catch (Exception xe)
            {

            }
        }
        /// <summary>
        /// 处理客户端消息
        /// </summary>
        /// <param name="token"></param>
        /// <param name="b"></param>
        private static void ReceiveMsg(AsyncUserToken token, byte[] b)
        {
            string msg = System.Text.Encoding.Default.GetString(b);
            if (!string.IsNullOrWhiteSpace(msg))
            {
                ReceiveMsgModel model = TypeConvertHepler.JsonToData<ReceiveMsgModel>(msg);
                if (model != null && model.State != 0)
                {
                    switch (model.State)
                    {
                        case 1:
                            foreach (var temp in m_clients)
                            {
                                if (temp.Remote == token.Remote)//通讯地址相等
                                {

                                    temp.Model = new ReceiveMsgModel
                                    {
                                        CompanyId = model.CompanyId
                                    };
                                }
                            }
                            break;
                        case 2:
                            //操作数据库
                            Go.Travel.NetWork.WcfHelper.TravelHelper.UpdateByWhere_B2cmesremind(new Dictionary<string, object> {
                                { nameof(B2cmesremind.IsRead),1}
                            }, new Dictionary<string, object> {
                                { nameof(B2cmesremind.KID),model.KID }
                            });
                            break;
                    }

                }
            }
        }
        // 异步发送操作完成时调用的方法.    
        // 该方法在套接字上发出另一个接收来读取任何附加信息   
        // 客户端发送的数据  
        //  
        // <param name="e"></param>  
        private static void ProcessSend(SocketAsyncEventArgs e)
        {
            if (e.SocketError == SocketError.Success)
            {
                // 完成对客户端的数据回传  
                AsyncUserToken token = (AsyncUserToken)e.UserToken;
                // 读取客户端发送的下一个数据块  
                bool willRaiseEvent = token.Socket.ReceiveAsync(e);
                if (!willRaiseEvent)
                {
                    ProcessReceive(e);
                }
            }
            else
            {
                CloseClientSocket(e);
            }
        }

        //关闭客户端  
        private static void CloseClientSocket(SocketAsyncEventArgs e)
        {
            AsyncUserToken token = e.UserToken as AsyncUserToken;

            lock (m_clients) { m_clients.Remove(token); }
            //如果有事件,则调用事件,发送客户端数量变化通知  
            //ClientNumberChange?.Invoke(-1, token);
            // 关闭与客户端关联的套接字  
            try
            {
                token.Socket.Shutdown(SocketShutdown.Send);
            }
            catch (Exception) { }
            token.Socket.Close();
            // 减少计数器,以跟踪连接到服务器的客户端总数  
            Interlocked.Decrement(ref m_clientCount);
            m_maxNumberAcceptedClients.Release();
            //SocketAsyncEventArg,以便它们可以被另一个客户端重用。
            e.UserToken = new AsyncUserToken();
            m_pool.Push(e);
        }



        /// <summary>  
        /// 对数据进行打包,然后再发送  
        /// </summary>  
        /// <param name="token"></param>  
        /// <param name="message"></param>  
        /// <returns></returns>  
        public static void SendMessage(AsyncUserToken token, byte[] message)
        {
            if (token == null || token.Socket == null || !token.Socket.Connected)
                return;
            try
            {
                //对要发送的消息,制定简单协议,头4字节指定包的大小,方便客户端接收(协议可以自己定)  
                byte[] buff = new byte[message.Length + 4];
                byte[] len = BitConverter.GetBytes(message.Length);
                Array.Copy(len, buff, 4);
                Array.Copy(message, 0, buff, 4, message.Length);
                //token.Socket.Send(buff);  //这句也可以发送, 可根据自己的需要来选择  
                //新建异步发送对象, 发送消息  
                SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs();
                sendArg.UserToken = token;
                sendArg.SetBuffer(buff, 0, buff.Length);  //将数据放置进去.  
                token.Socket.SendAsync(sendArg);
            }
            catch (Exception e)
            {

            }
        }

        /// <summary>
        /// 给所有在线的用户发送消息,且CompanyId == 传递的值
        /// </summary>
        /// <param name="message"></param>
        /// <param name="companyId"></param>
        public static void SendAllUserMsg(byte[] message, string companyId)
        {
            try
            {
                foreach (var token in m_clients)
                {
                    if (token == null || token.Socket == null || !token.Socket.Connected || token.Model.CompanyId != companyId)
                        continue;
                    else
                    {
                        //对要发送的消息,制定简单协议,头4字节指定包的大小,方便客户端接收(协议可以自己定)  
                        byte[] buff = new byte[message.Length + 4];
                        byte[] len = BitConverter.GetBytes(message.Length);
                        Array.Copy(len, buff, 4);
                        Array.Copy(message, 0, buff, 4, message.Length);
                        //token.Socket.Send(buff);  //这句也可以发送, 可根据自己的需要来选择  
                        //新建异步发送对象, 发送消息  
                        SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs();
                        sendArg.UserToken = token;
                        sendArg.SetBuffer(buff, 0, buff.Length);  //将数据放置进去.  
                        token.Socket.SendAsync(sendArg);
                    }
                }
                //调用http推送消息  System.Text.Encoding.Default.GetString(message)
                SendMsgModel model = TypeConvertHepler.JsonToData<SendMsgModel>(Encoding.Default.GetString(message));
                if (model != null)
                {
                    HttpHepler.HttpGet(ConfigurationManager.AppSettings["lyUrlPath"].ToString() + "/SignalrPush/PushMsg/?message=" + model.Guid);
                }
            }
            catch (Exception e)
            {

            }
        }
    }
}

  

SendAllUserMsg 这个方法是我自己定义的 做了广播处理

那么这里你可以告诉服务端,我要给谁传递消息  AsyncUserToken 就是这个对象里面的咯  你可以根据IP + 端口 找到唯一的客户端

 

接下来   初始化  然后启动就可以啦

  /// <summary>
        /// 启动Socket
        /// </summary>
        private static void StartSocket()
        {
            SocketManager.Init();
            SocketManager.Start(new IPEndPoint(IPAddress.Parse(TypeConvertHepler.GetLocalIP()), 10001));
        }

 

 

是不是很nice  拿起来就能用哦  









以上是关于实时通讯第二篇的主要内容,如果未能解决你的问题,请参考以下文章

数据结构初阶第二篇——顺序表(实现+动图演示)[建议收藏]

Mybatis框架第二篇

SpringCloud教程 | 第二篇: 服务消费者(rest+ribbon)(Finchley版本)

ELK总结——第二篇Logstash的搭建

业余草 SpringCloud教程 | 第二篇: 服务消费者(rest+ribbon)(Finchley版本)

史上最简单的SpringCloud教程 | 第二篇: 服务消费者(rest+ribbon)