BeetleX实现MessagePack和Protobuf消息控制器调用websocket服务详解

Posted dotNET跨平台

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了BeetleX实现MessagePack和Protobuf消息控制器调用websocket服务详解相关的知识,希望对你有一定的参考价值。

        最近有用户问如何使用BeetleX封装一个基于Protobuf格式的websocket服务并支持控制器调用;其实BeetleX.FastHttpApi是支持Websocket服务和自定义数据格式的,但需要对组件有一定了解的情况才能进行扩展;接下来通过封装一个支持Protobuf和MessagePack的websocket服务来介绍BeetleX.FastHttpApi的相关功能扩展和应用。

格式封装

        首先需要一个特性来标记消息类型在数据中用怎样的数据来标记

[AttributeUsage(AttributeTargets.Class)]
    public class BinaryTypeAttribute : Attribute
    
        public BinaryTypeAttribute(int id)
        
            this.ID = id;
        
        public int ID  get; set; 
    

可以通过一个Int类型来定义数值与消息类的关系。然后再定义一个特性用于描述消息对应的方法控制器类

[AttributeUsage(AttributeTargets.Class)]
    public class MessageControllerAttribute : Attribute
    


    

接下来就可以封装对应Protobuf的数据转换类,为了转换方便就直接使用Protobuf.Net这个组件了

public class BinaryDataFactory
    


        //数值与类型的关系
        private Dictionary<int, Type> mMessageTypes = new Dictionary<int, Type>();
        //类型与数值的关系
        private Dictionary<Type, int> mMessageIDs = new Dictionary<Type, int>();
        //定义消息与方法的关系
        private Dictionary<Type, ActionHandler> mActions = new Dictionary<Type, ActionHandler>();


        public void RegisterComponent<T>()
        
            RegisterComponent(typeof(T));
        


        public void RegisterComponent(Type type)
        
            foreach (var item in type.Assembly.GetTypes())
            
                BinaryTypeAttribute[] bta = (BinaryTypeAttribute[])item.GetCustomAttributes(typeof(BinaryTypeAttribute), false);
                if (bta != null && bta.Length > 0)
                
                    mMessageTypes[bta[0].ID] = item;
                    mMessageIDs[item] = bta[0].ID;
                
#if PROTOBUF_SERVER
                var mca = item.GetCustomAttribute<MessageControllerAttribute>(false);
                if (mca != null)
                
                    var controller = Activator.CreateInstance(item);
                    foreach (MethodInfo method in item.GetMethods(BindingFlags.Public | BindingFlags.Instance))
                    
                        var parameters = method.GetParameters();
                        if (parameters.Length == 2 && parameters[0].ParameterType == typeof(WebSocketReceiveArgs)) 
                            ActionHandler handler = new ActionHandler(controller, method);
                            mActions[parameters[1].ParameterType] = handler;
                        
                    
                
#endif
            
        
        //序列化对象
        public ArraySegment<byte> Serializable(object data)
        
            MemoryStream memory = new MemoryStream();
            var type = GetTypeID(data.GetType(), true);
            memory.Write(type);
            Serializer.Serialize(memory, data);
            return new ArraySegment<byte>(memory.GetBuffer(), 0, (int)memory.Length);
        
        //反序列化对象
        public object Deserialize(byte[] data)
        
            return Deserialize(new ArraySegment<byte>(data, 0, data.Length));
        
        //反序列化对象
        public object Deserialize(ArraySegment<byte> data)
        
            MemoryStream memory = new MemoryStream(data.Array, data.Offset, data.Count);
            byte[] id = new byte[4];
            memory.Read(id, 0, 4);
            Type type = GetMessageType(id, true);
            return Serializer.Deserialize(type, memory);
        
        //获消息对应数值的存储数据
        public byte[] GetTypeID(Type type, bool littleEndian)
        
            if (mMessageIDs.TryGetValue(type, out int value))
            


                if (!littleEndian)
                
                    value = BeetleX.Buffers.BitHelper.SwapInt32(value);
                
                return BitConverter.GetBytes(value);


            
            throw new Exception($"binary websocket type id not found!");
        
        //根据数值获取类型
        public Type GetMessageType(int id)
        
            mMessageTypes.TryGetValue(id, out Type result);
            return result;
        
        //根据存储数获取类型
        public Type GetMessageType(byte[] data, bool littleEndian)
        
            int value = BitConverter.ToInt32(data, 0);
            if (!littleEndian)
                value = BeetleX.Buffers.BitHelper.SwapInt32(value);
            return GetMessageType(value);
        
        //根据消息获处理方法
        public ActionHandler GetHandler(object message)
        
            mActions.TryGetValue(message.GetType(), out ActionHandler result);
            return result;
        
    


    public class ActionHandler
    
        public ActionHandler(object controller, MethodInfo method)
        
            Method = method;
            Controller = controller;
            IsVoid = method.ReturnType == typeof(void);
            IsTaskResult = method.ReturnType.BaseType == typeof(Task);
            if (IsTaskResult && method.ReturnType.IsGenericType)
            
                HasTaskResultData = true;
                mGetPropertyInfo = method.ReturnType.GetProperty("Result", BindingFlags.Public | BindingFlags.Instance);
            
        
        private PropertyInfo mGetPropertyInfo;
        public MethodInfo Method  get; private set; 
        public bool IsTaskResult  get; set; 
        public bool HasTaskResultData  get; set; 
        public bool IsVoid  get; private set; 
        public object Controller  get; private set; 
        public object GetTaskResult(Task task)
        
            return mGetPropertyInfo.GetValue(task);
        
        public object Execute(params object[] data)
        
            return Method.Invoke(Controller, data);
        
    

协议转换

        Protobuf的数据格式处理完成后就要针对BeetleX.FastHttpApi构建一个相应数据转换器,这个转换器实现也是非常简单的。

public class ProtobufFrameSerializer : IDataFrameSerializer
    
        public static BinaryDataFactory BinaryDataFactory  get; set;  = new BinaryDataFactory();
        public object FrameDeserialize(DataFrame data, PipeStream stream)
        
            var buffers = new byte[data.Length];
            stream.Read(buffers, 0, buffers.Length);
            return BinaryDataFactory.Deserialize(buffers);
        
        public void FrameRecovery(byte[] buffer)
        


        
        public ArraySegment<byte> FrameSerialize(DataFrame packet, object body)
        
            return BinaryDataFactory.Serializable(body);
        
    

实现IDataFrameSerializer接口的相关方法即可,接下来就针对HttpApiServer封装一些扩展方法用于注册对象和绑定Websocket数据处理事件。

public static class ProtobufExtensions
    
        public static HttpApiServer RegisterProtobuf<T>(this HttpApiServer server)
        
            ProtobufFrameSerializer.BinaryDataFactory.RegisterComponent<T>();
            return server;
        
        public static HttpApiServer UseProtobufController(this HttpApiServer server, Action<WebSocketReceiveArgs> handler = null)
        
            server.WebSocketReceive = async (o, e) =>
            
                try
                
                    var msg = e.Frame.Body;
                    var action = ProtobufFrameSerializer.BinaryDataFactory.GetHandler(msg);
                    if (action != null)
                    
                        if (!action.IsVoid)
                        
                            if (action.IsTaskResult)
                            


                                Task task = (Task)action.Execute(e, msg);
                                await task;
                                if (action.HasTaskResultData)
                                
                                    var result = action.GetTaskResult(task);
                                    e.ResponseBinary(result);
                                
                            
                            else
                            
                                var result = action.Execute(e, msg);
                                e.ResponseBinary(result);
                            
                        
                    
                    else
                    
                        handler?.Invoke(e);
                    
                
                catch (Exception e_)
                
                    e.Request.Server.GetLog(BeetleX.EventArgs.LogType.Warring)
                    ?.Log(BeetleX.EventArgs.LogType.Error, e.Request.Session, $"Websocket packet process error e_.Message");
                
            ;
            return server;
        
    

服务使用

        所有相关功能都封装后就可以启动HttpApiServer并把相关功能设置使用,在使用之前需要引用BeetleX.FastHttpApi.Hosting和Protobuf.Net组件

[BinaryType(1)]
    [ProtoContract]
    public class User
    
        [ProtoMember(1)]
        public string Name  get; set; 
        [ProtoMember(2)]
        public string Email  get; set; 
        [ProtoMember(3)]
        public DateTime ResultTime  get; set; 
    
    [MessageController]
    public class Controller
    
        public User Login(WebSocketReceiveArgs e, User user)
        
            user.ResultTime = DateTime.Now;
            return user;
        
    
    class Program
    
        static void Main(string[] args)
        
            BeetleX.FastHttpApi.Hosting.HttpServer server = new BeetleX.FastHttpApi.Hosting.HttpServer(80);
            server.Setting((service, options) =>
            
                options.LogLevel = BeetleX.EventArgs.LogType.Trace;
                options.LogToConsole = true;
                options.ManageApiEnabled = false;
                options.WebSocketFrameSerializer = new ProtobufFrameSerializer();
            );
            server.Completed(http =>
            
                http.RegisterProtobuf<User>();
                http.UseProtobufController();
            );
            server.Run();
        
    

客户端

        如果你希望在.Net中使用Websocket客户端,BeetleX同样也提供一个扩展组件BeetleX.Http.Clients,通过这组件可以轻易封装一个Protobuf的Websocket客户端。

public class ProtobufClient : BinaryClient
    
        public ProtobufClient(string host) : base(host)  


        public static BinaryDataFactory BinaryDataFactory  get; private set;  = new BinaryDataFactory();


        protected override object OnDeserializeObject(ArraySegment<byte> data)
        
            return BinaryDataFactory.Deserialize(data);
        


        protected override ArraySegment<byte> OnSerializeObject(object data)
        
            return BinaryDataFactory.Serializable(data);
        
    

封装完成后就可以使用了

class Program
    
        static async Task Main(string[] args)
        
            ProtobufClient.BinaryDataFactory.RegisterComponent<User>();
            var client = new ProtobufClient("ws://localhost");
            User user = new User  Email="henryfan@msn.com", Name="henryfan" ;
            var result = await client.ReceiveFrom<User>(user);
            Console.WriteLine(result.Name);
        
    

完整示例代码可以访问

https://github.com/beetlex-io/BeetleX-Samples/tree/master/Websocket.ProtobufPacket

https://github.com/beetlex-io/BeetleX-Samples/tree/master/Websocket.MessagePackPacket

BeetleX

开源跨平台通讯框架(支持TLS)
提供高性能服务和大数据处理解决方案

https://beetlex-io.com

以上是关于BeetleX实现MessagePack和Protobuf消息控制器调用websocket服务详解的主要内容,如果未能解决你的问题,请参考以下文章

BeetleX实现HTTP协议详解

BeetleX之XRPC远程委托调用

BeetleX.FastHttpApi之控制器调度设计

使用BeetleX的TcpBenchmark工具进行百万设备模拟测试

MessagePack for C#

快速序列化组件MessagePack介绍