消息推送服务
服务器推送目前流行就是私信、发布/订阅等模式,基本上都是基于会话映射,消息对列等技术实现的;高性能、分布式可以如下解决:会话映射可采用redis cluster等技术实现,消息对列可使用kafka等分布式消息队列方案实现。
APM.Server基于简单
1 static ConcurrentDictionary<string, Session> _sessionDic = new ConcurrentDictionary<string, Session>();
和
1 private static ConcurrentQueue<Message> _messageQueue = new ConcurrentQueue<Message>();
实现。
部分代码如下:
1 /// <summary> 2 /// 消息转发 3 /// </summary> 4 private void ForwardMsg() 5 { 6 try 7 { 8 var msg = MessageQueue.Dequeue(); 9 if (msg != null) 10 { 11 switch (msg.Type) 12 { 13 case (byte)MessageType.Sub: 14 if (!msg.IsMuti) 15 { 16 if (!SessionDic.Exists(msg.SessionID, msg.SessionID)) 17 SessionDic.Set(this._server, msg.SessionID, msg.SessionID); 18 } 19 if (!SessionDic.Exists(msg.SessionID, msg.Sender)) 20 SessionDic.Set(this._server, msg.Sender, msg.SessionID); 21 break; 22 case (byte)MessageType.Unsub: 23 if (!msg.IsMuti) 24 { 25 if (SessionDic.Exists(msg.SessionID, msg.SessionID)) 26 SessionDic.Del(msg.SessionID, msg.SessionID); 27 } 28 if (SessionDic.Exists(msg.SessionID, msg.Sender)) 29 SessionDic.Del(msg.Sender, msg.SessionID); 30 break; 31 default: 32 var session = SessionDic.Get(msg.SessionID); 33 if (session != null) 34 { 35 var remotes = session.UserTokenDic.List.Where(b => b.ID != msg.Sender).ToList(); 36 if (remotes != null && remotes.Count > 0) 37 { 38 Parallel.For(0, remotes.Count, i => 39 { 40 this._server.SendMsg(remotes[i], Message.Serialize(msg)); 41 }); 42 } 43 } 44 this.OnMessage?.Invoke(msg); 45 break; 46 } 47 48 } 49 } 50 catch { } 51 }
1 /// <summary> 2 /// 消息转发 3 /// </summary> 4 private void ForwardMsg() 5 { 6 try 7 { 8 var msg = MessageQueue.Dequeue(); 9 if (msg != null) 10 { 11 switch (msg.Type) 12 { 13 case (byte)MessageType.Sub: 14 if (!msg.IsMuti) 15 { 16 if (!SessionDic.Exists(msg.SessionID, msg.SessionID)) 17 SessionDic.Set(this._server, msg.SessionID, msg.SessionID); 18 } 19 if (!SessionDic.Exists(msg.SessionID, msg.Sender)) 20 SessionDic.Set(this._server, msg.Sender, msg.SessionID); 21 break; 22 case (byte)MessageType.Unsub: 23 if (!msg.IsMuti) 24 { 25 if (SessionDic.Exists(msg.SessionID, msg.SessionID)) 26 SessionDic.Del(msg.SessionID, msg.SessionID); 27 } 28 if (SessionDic.Exists(msg.SessionID, msg.Sender)) 29 SessionDic.Del(msg.Sender, msg.SessionID); 30 break; 31 default: 32 var session = SessionDic.Get(msg.SessionID); 33 if (session != null) 34 { 35 var remotes = session.UserTokenDic.List.Where(b => b.ID != msg.Sender).ToList(); 36 if (remotes != null && remotes.Count > 0) 37 { 38 Parallel.For(0, remotes.Count, i => 39 { 40 this._server.SendMsg(remotes[i], Message.Serialize(msg)); 41 }); 42 } 43 } 44 this.OnMessage?.Invoke(msg); 45 break; 46 } 47 48 } 49 } 50 catch { } 51 }
异步tcp通信——APM.Core 服务端概述
异步tcp通信——APM.Core 解包
异步tcp通信——APM.Server 消息推送服务的实现
异步tcp通信——APM.ConsoleDemo
转载请标明本文来源:http://www.cnblogs.com/yswenli/
更多内容欢迎star作者的github:https://github.com/yswenli/APM