SuperSocket与SuperSocket.ClientEngine实现Protobuf协议
Posted 非法关键字
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SuperSocket与SuperSocket.ClientEngine实现Protobuf协议相关的知识,希望对你有一定的参考价值。
- 参考资料说明
SuperSocket文档 http://docs.supersocket.net/
Protobuf语言参考 https://developers.google.com/protocol-buffers/docs/proto
单消息多类型解决方案 https://developers.google.com/protocol-buffers/docs/techniques#
主要资料(非常感谢) http://www.cnblogs.com/caipeiyu/p/5559112.html
使用的ProtocolBuffers http://code.google.com/p/protobuf-csharp-port
关于MsgPack的协议 https://my.oschina.net/caipeiyu/blog/512437
- Proto
1 message CallMessage 2 { 3 optional string content = 1; 4 } 5 6 message BackMessage 7 { 8 optional string content = 1; 9 } 10 11 message PersonMessage 12 { 13 required int32 id = 1; 14 required string name = 2; 15 enum Sex 16 { 17 Male = 1; 18 Female = 2; 19 } 20 required Sex sex = 3 [default = Male]; 21 required uint32 age = 4; 22 required string phone = 5; 23 } 24 25 import "BackMessage.proto"; 26 import "CallMessage.proto"; 27 import "PersonMessage.proto"; 28 29 message DefeatMessage 30 { 31 enum Type 32 { 33 CallMessage = 1; 34 BackMessage = 2; 35 PersonMessage = 3; 36 } 37 required Type type = 1; 38 optional CallMessage callMessage = 2; 39 optional BackMessage backMessage = 3; 40 optional PersonMessage personMessage = 4; 41 }
- 生成C#代码
1 protoc --descriptor_set_out=DefeatMessage.protobin --proto_path=./ --include_imports DefeatMessage.proto 2 protogen DefeatMessage.protobin
- Server
1 namespace SuperSocketProtoServer.Protocol 2 { 3 public class ProtobufRequestInfo: IRequestInfo 4 { 5 public string Key { get; } 6 public DefeatMessage.Types.Type Type { get; } 7 8 public DefeatMessage Body { get; } 9 10 public ProtobufRequestInfo(DefeatMessage.Types.Type type, DefeatMessage body) 11 { 12 Type = type; 13 Key = type.ToString(); 14 Body = body; 15 } 16 } 17 } 18 19 namespace SuperSocketProtoServer.Protocol 20 { 21 public class ProtobufReceiveFilter: IReceiveFilter<ProtobufRequestInfo>, IOffsetAdapter, IReceiveFilterInitializer 22 { 23 private int _origOffset; 24 private int _offsetDelta; 25 private int _leftBufferSize; 26 27 public void Initialize(IAppServer appServer, IAppSession session) 28 { 29 _origOffset = session.SocketSession.OrigReceiveOffset; 30 } 31 32 public int OffsetDelta 33 { 34 get { return _offsetDelta; } 35 } 36 37 /// <summary> 38 /// 数据包解析 39 /// </summary> 40 /// <param name="readBuffer">接收缓冲区</param> 41 /// <param name="offset">接收到的数据在缓冲区的起始位置</param> 42 /// <param name="length">本轮接收到的数据长度</param> 43 /// <param name="toBeCopied">为接收到的数据重新创建一个备份而不是直接使用接收缓冲区</param> 44 /// <param name="rest">接收缓冲区未被解析的数据</param> 45 /// <returns></returns> 46 public ProtobufRequestInfo Filter(byte[] readBuffer, int offset, int length, bool toBeCopied, out int rest) 47 { 48 rest = 0; 49 // 重新计算缓冲区的起始位置,前一次解析还有剩下没有解析的数据就需要把起始位置移到之前最后要解析的那个位置 50 var readOffset = offset - _offsetDelta; 51 // 由google.protocolbuffers提供 52 CodedInputStream cis = CodedInputStream.CreateInstance(readBuffer, readOffset, length); 53 // 计算数据包的长度,不包含Length本身 54 int varint32 = (int) cis.ReadRawVarint32(); 55 if (varint32 <= 0) return null; 56 57 // 计算协议里面Length占用字节 58 int headLen = (int) (cis.Position - readOffset); 59 // 本轮解析完缓冲后剩余没有解析的数据大小 60 rest = length - varint32 - headLen + _leftBufferSize; 61 62 // 缓冲里面的数据足够本轮解析 63 if (rest >= 0) 64 { 65 byte[] body = cis.ReadRawBytes(varint32); 66 DefeatMessage message = DefeatMessage.ParseFrom(body); 67 ProtobufRequestInfo requestInfo = new ProtobufRequestInfo(message.Type, message); 68 _offsetDelta = 0; 69 _leftBufferSize = 0; 70 71 return requestInfo; 72 } 73 // 缓冲里面的数据不够本次解析[tcp分包传送] 74 else 75 { 76 _leftBufferSize += length; 77 _offsetDelta = _leftBufferSize; 78 rest = 0; 79 80 var expectedOffset = offset + length; 81 var newOffset = _origOffset + _offsetDelta; 82 if (newOffset < expectedOffset) Buffer.BlockCopy(readBuffer, offset - _leftBufferSize + length, readBuffer, _origOffset, _leftBufferSize); 83 } 84 85 return null; 86 } 87 88 public void Reset() 89 { 90 _offsetDelta = 0; 91 _leftBufferSize = 0; 92 } 93 94 public int LeftBufferSize 95 { 96 get { return _leftBufferSize; } 97 } 98 99 public IReceiveFilter<ProtobufRequestInfo> NextReceiveFilter { get; } 100 101 public FilterState State { get; } 102 } 103 } 104 105 namespace SuperSocketProtoServer.Protocol 106 { 107 public class ProtobufAppSession:AppSession<ProtobufAppSession, ProtobufRequestInfo> 108 { 109 public ProtobufAppSession() { } 110 } 111 } 112 113 namespace SuperSocketProtoServer.Protocol 114 { 115 public class ProtobufAppServer: AppServer<ProtobufAppSession, ProtobufRequestInfo> 116 { 117 public ProtobufAppServer() 118 : base(new DefaultReceiveFilterFactory<ProtobufReceiveFilter, ProtobufRequestInfo>()) 119 { 120 121 } 122 } 123 }
- Server Command
1 namespace SuperSocketProtoServer.Protocol.Command 2 { 3 public class BackMessage : CommandBase<ProtobufAppSession, ProtobufRequestInfo> 4 { 5 public override void ExecuteCommand(ProtobufAppSession session, ProtobufRequestInfo requestInfo) 6 { 7 Console.WriteLine("BackMessage:{0}", requestInfo.Body.BackMessage.Content); 8 } 9 } 10 } 11 12 namespace SuperSocketProtoServer.Protocol.Command 13 { 14 public class CallMessage : CommandBase<ProtobufAppSession, ProtobufRequestInfo> 15 { 16 public override void ExecuteCommand(ProtobufAppSession session, ProtobufRequestInfo requestInfo) 17 { 18 Console.WriteLine("CallMessage:{0}", requestInfo.Body.CallMessage.Content); 19 var backMessage = global::BackMessage.CreateBuilder().SetContent("Hello I am from C# server by SuperSocket") 20 .Build(); 21 var message = DefeatMessage.CreateBuilder().SetType(DefeatMessage.Types.Type.BackMessage) 22 .SetBackMessage(backMessage).Build(); 23 using (var stream = new MemoryStream()) 24 { 25 CodedOutputStream cos = CodedOutputStream.CreateInstance(stream); 26 cos.WriteMessageNoTag(message); 27 cos.Flush(); 28 byte[] data = stream.ToArray(); 29 session.Send(new ArraySegment<byte>(data)); 30 } 31 } 32 } 33 } 34 35 namespace SuperSocketProtoServer.Protocol.Command 36 { 37 public class PersonMessage:CommandBase<ProtobufAppSession, ProtobufRequestInfo> 38 { 39 public override void ExecuteCommand(ProtobufAppSession session, ProtobufRequestInfo requestInfo) 40 { 41 Console.WriteLine("Recv Person Message From Client."); 42 Console.WriteLine("person\'s id = {0}, person\'s name = {1}, person\'s sex = {2}, person\'s phone = {3}", 43 requestInfo.Body.PersonMessage.Id, 44 requestInfo.Body.PersonMessage.Name, 45 requestInfo.Body.PersonMessage.Sex, 46 requestInfo.Body.PersonMessage.Phone); 47 } 48 } 49 }
- Client
1 namespace SuperSocketProtoClient.Protocol 2 { 3 public class ProtobufPackageInfo: IPackageInfo 4 { 5 public string Key { get; } 6 public DefeatMessage.Types.Type Type { get; } 7 public DefeatMessage Body { get; } 8 9 public ProtobufPackageInfo(DefeatMessage.Types.Type type, DefeatMessage body) 10 { 11 Type = type; 12 Key = type.ToString(); 13 Body = body; 14 } 15 } 16 } 17 18 namespace SuperSocketProtoClient.Protocol 19 { 20 public class ProtobufReceiveFilter: IReceiveFilter<ProtobufPackageInfo> 21 { 22 /// <summary> 23 /// 数据解析 24 /// BufferList已经实现了分包处理 25 /// </summary> 26 /// <param name="data">数据缓冲区</param> 27 /// <param name="rest">缓冲区剩余数据</param> 28 public ProtobufPackageInfo Filter(BufferList data, out int rest) 29 { 30 rest = 0; 31 var buffStream = new BufferStream(); 32 buffStream.Initialize(data); 33 34 var stream = CodedInputStream.CreateInstance(buffStream); 35 var varint32 = (int)stream.ReadRawVarint32(); 36 if (varint32 <= 0) return default(ProtobufPackageInfo); 37 38 var total = data.Total; 39 var packageLen = varint32 + (int)stream.Position; 40 41 if (total >= packageLen) 42 { 43 rest = total - packageLen; 44 var body = stream.ReadRawBytes(varint32); 45 var message = DefeatMessage.ParseFrom(body); 46 var requestInfo = new ProtobufPackageInfo(message.Type, message); 47 return requestInfo; 48 } 49 50 return default(ProtobufPackageInfo); 51 } 52 53 public void Reset() 54 { 55 NextReceiveFilter = null; 56 State = FilterState.Normal; 57 } 58 59 public IReceiveFilter<ProtobufPackageInfo> NextReceiveFilter { get; protected set; } 60 public FilterState State { get; protected set; } 61 } 62 }
- Server Entrance
1 namespace SuperSocketProtoServer 2 { 3 class Program 4 { 5 static void Main(string[] args) 6 { 7 Console.WriteLine("Press any key to start the server!"); 8 9 Console.ReadKey(); 10 Console.WriteLine(); 11 12 var appServer = new ProtobufAppServer(); 13 //appServer.NewRequestReceived += AppServerOnNewRequestReceived; 14 15 try 16 { 17 //Setup the appServer 18 if (!appServer.Setup(2017)) //Setup with listening port 19 { 20 Console.WriteLine("Failed to setup!"); 21 Console.ReadKey(); 22 return; 23 } 24 }catch(Exception e) { Console.WriteLine(e);} 25 26 Console.WriteLine(); 27 28 //Try to start the appServer 29 if (!appServer.Start()) 30 { 31 Console.WriteLine("Failed to start!"); 32 Console.ReadKey(); 33 return; 34 } 35 36 Console.WriteLine("The server started successfully, press key \'q\' to stop it!"); 37 38 while (Console.ReadKey().KeyChar != \'q\') 39 { 40 Console.WriteLine(); 41 continue; 42 } 43 44 //Stop the appServer 45 appServer.Stop(); 46 47 Console.WriteLine("The server was stopped!"); 48 Console.ReadKey(); 49 } 50 51 private static void AppServerOnNewRequestReceived(ProtobufAppSession session, ProtobufRequestInfo requestinfo) 52 { 53 switch (requestinfo.Type) 54 { 55 case DefeatMessage.Types.Type.BackMessage: 56 Console.WriteLine("BackMessage:{0}", requestinfo.Body.BackMessage.Content); 57 break; 58 case DefeatMessage.Types.Type.CallMessage: 59 Console.WriteLine("CallMessage:{0}", requestinfo.Body.CallMessage.Content); 60 var backMessage = BackMessage.CreateBuilder().SetContent("Hello I am from C# server by SuperSocket") 61 .Build(); 62 var message = DefeatMessage.CreateBuilder().SetType(DefeatMessage.Types.Type.BackMessage) 63 .SetBackMessage(backMessage).Build(); 64 using (var stream = new MemoryStream()) 65 { 66 CodedOutputStream cos = CodedOutputStream.CreateInstance(stream); 67 cos.WriteMessageNoTag(message); 68 cos.Flush(); 69 byte[] data = stream.ToArray(); 70 session.Send(new ArraySegment<byte>(data)); 71 } 72 break; 73 } 74 } 75 } 76 }
- Client Entrance
1 namespace SuperSocketProtoClient 2 { 3 class Program 4 { 5 static void Main(string[] args) 6 { 7 EasyClient client = new EasyClient(); 8 client.Initialize(new ProtobufReceiveFilter(), packageInfo => 9 { 10 switch (packageInfo.Type) 11 { 12 case DefeatMessage.Types.Type.BackMessage: 13 Console.WriteLine("BackMessage:{0}", packageInfo.Body.BackMessage.Content); 14 break; 15 case DefeatMessage.Types.Type.CallMessage: 16 Console.WriteLine("CallMessage:{0}", packageInfo.Body.CallMessage.Content); 17 break; 18 19 } 20 }); 21 var flag = client.ConnectAsync(new DnsEndPoint("127.0.0.1", 2017)); 22 if (flag.Result) 23 { 24 var callMessage = CallMessage.CreateBuilder() 25 .SetContent("Hello I am form C# client by SuperSocket ClientEngine").Build(); 26 var message =SuperSocket源码解析之会话生命周期SuperSocket与Netty之实现protobuf协议,包括服务端和客户端
SuperSocket 2.0 发布第一个预览版, 另寻找Yang Fan哥哥