BeetleX实现MessagePack和Protobuf消息控制器调用websocket服务详解
Posted dotNET跨平台
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了BeetleX实现MessagePack和Protobuf消息控制器调用websocket服务详解相关的知识,希望对你有一定的参考价值。
最近有用户问如何使用BeetleX封装一个基于Protobuf格式的websocket服务并支持控制器调用;其实BeetleX.FastHttpApi是支持Websocket服务和自定义数据格式的,但需要对组件有一定了解的情况才能进行扩展;接下来通过封装一个支持Protobuf和MessagePack的websocket服务来介绍BeetleX.FastHttpApi的相关功能扩展和应用。
格式封装
首先需要一个特性来标记消息类型在数据中用怎样的数据来标记
[AttributeUsage(AttributeTargets.Class)]
public class BinaryTypeAttribute : Attribute
public BinaryTypeAttribute(int id)
this.ID = id;
public int ID get; set;
可以通过一个Int类型来定义数值与消息类的关系。然后再定义一个特性用于描述消息对应的方法控制器类
[AttributeUsage(AttributeTargets.Class)]
public class MessageControllerAttribute : Attribute
接下来就可以封装对应Protobuf的数据转换类,为了转换方便就直接使用Protobuf.Net这个组件了
public class BinaryDataFactory
//数值与类型的关系
private Dictionary<int, Type> mMessageTypes = new Dictionary<int, Type>();
//类型与数值的关系
private Dictionary<Type, int> mMessageIDs = new Dictionary<Type, int>();
//定义消息与方法的关系
private Dictionary<Type, ActionHandler> mActions = new Dictionary<Type, ActionHandler>();
public void RegisterComponent<T>()
RegisterComponent(typeof(T));
public void RegisterComponent(Type type)
foreach (var item in type.Assembly.GetTypes())
BinaryTypeAttribute[] bta = (BinaryTypeAttribute[])item.GetCustomAttributes(typeof(BinaryTypeAttribute), false);
if (bta != null && bta.Length > 0)
mMessageTypes[bta[0].ID] = item;
mMessageIDs[item] = bta[0].ID;
#if PROTOBUF_SERVER
var mca = item.GetCustomAttribute<MessageControllerAttribute>(false);
if (mca != null)
var controller = Activator.CreateInstance(item);
foreach (MethodInfo method in item.GetMethods(BindingFlags.Public | BindingFlags.Instance))
var parameters = method.GetParameters();
if (parameters.Length == 2 && parameters[0].ParameterType == typeof(WebSocketReceiveArgs))
ActionHandler handler = new ActionHandler(controller, method);
mActions[parameters[1].ParameterType] = handler;
#endif
//序列化对象
public ArraySegment<byte> Serializable(object data)
MemoryStream memory = new MemoryStream();
var type = GetTypeID(data.GetType(), true);
memory.Write(type);
Serializer.Serialize(memory, data);
return new ArraySegment<byte>(memory.GetBuffer(), 0, (int)memory.Length);
//反序列化对象
public object Deserialize(byte[] data)
return Deserialize(new ArraySegment<byte>(data, 0, data.Length));
//反序列化对象
public object Deserialize(ArraySegment<byte> data)
MemoryStream memory = new MemoryStream(data.Array, data.Offset, data.Count);
byte[] id = new byte[4];
memory.Read(id, 0, 4);
Type type = GetMessageType(id, true);
return Serializer.Deserialize(type, memory);
//获消息对应数值的存储数据
public byte[] GetTypeID(Type type, bool littleEndian)
if (mMessageIDs.TryGetValue(type, out int value))
if (!littleEndian)
value = BeetleX.Buffers.BitHelper.SwapInt32(value);
return BitConverter.GetBytes(value);
throw new Exception($"binary websocket type id not found!");
//根据数值获取类型
public Type GetMessageType(int id)
mMessageTypes.TryGetValue(id, out Type result);
return result;
//根据存储数获取类型
public Type GetMessageType(byte[] data, bool littleEndian)
int value = BitConverter.ToInt32(data, 0);
if (!littleEndian)
value = BeetleX.Buffers.BitHelper.SwapInt32(value);
return GetMessageType(value);
//根据消息获处理方法
public ActionHandler GetHandler(object message)
mActions.TryGetValue(message.GetType(), out ActionHandler result);
return result;
public class ActionHandler
public ActionHandler(object controller, MethodInfo method)
Method = method;
Controller = controller;
IsVoid = method.ReturnType == typeof(void);
IsTaskResult = method.ReturnType.BaseType == typeof(Task);
if (IsTaskResult && method.ReturnType.IsGenericType)
HasTaskResultData = true;
mGetPropertyInfo = method.ReturnType.GetProperty("Result", BindingFlags.Public | BindingFlags.Instance);
private PropertyInfo mGetPropertyInfo;
public MethodInfo Method get; private set;
public bool IsTaskResult get; set;
public bool HasTaskResultData get; set;
public bool IsVoid get; private set;
public object Controller get; private set;
public object GetTaskResult(Task task)
return mGetPropertyInfo.GetValue(task);
public object Execute(params object[] data)
return Method.Invoke(Controller, data);
协议转换
Protobuf的数据格式处理完成后就要针对BeetleX.FastHttpApi构建一个相应数据转换器,这个转换器实现也是非常简单的。
public class ProtobufFrameSerializer : IDataFrameSerializer
public static BinaryDataFactory BinaryDataFactory get; set; = new BinaryDataFactory();
public object FrameDeserialize(DataFrame data, PipeStream stream)
var buffers = new byte[data.Length];
stream.Read(buffers, 0, buffers.Length);
return BinaryDataFactory.Deserialize(buffers);
public void FrameRecovery(byte[] buffer)
public ArraySegment<byte> FrameSerialize(DataFrame packet, object body)
return BinaryDataFactory.Serializable(body);
实现IDataFrameSerializer接口的相关方法即可,接下来就针对HttpApiServer封装一些扩展方法用于注册对象和绑定Websocket数据处理事件。
public static class ProtobufExtensions
public static HttpApiServer RegisterProtobuf<T>(this HttpApiServer server)
ProtobufFrameSerializer.BinaryDataFactory.RegisterComponent<T>();
return server;
public static HttpApiServer UseProtobufController(this HttpApiServer server, Action<WebSocketReceiveArgs> handler = null)
server.WebSocketReceive = async (o, e) =>
try
var msg = e.Frame.Body;
var action = ProtobufFrameSerializer.BinaryDataFactory.GetHandler(msg);
if (action != null)
if (!action.IsVoid)
if (action.IsTaskResult)
Task task = (Task)action.Execute(e, msg);
await task;
if (action.HasTaskResultData)
var result = action.GetTaskResult(task);
e.ResponseBinary(result);
else
var result = action.Execute(e, msg);
e.ResponseBinary(result);
else
handler?.Invoke(e);
catch (Exception e_)
e.Request.Server.GetLog(BeetleX.EventArgs.LogType.Warring)
?.Log(BeetleX.EventArgs.LogType.Error, e.Request.Session, $"Websocket packet process error e_.Message");
;
return server;
服务使用
所有相关功能都封装后就可以启动HttpApiServer并把相关功能设置使用,在使用之前需要引用BeetleX.FastHttpApi.Hosting和Protobuf.Net组件
[BinaryType(1)]
[ProtoContract]
public class User
[ProtoMember(1)]
public string Name get; set;
[ProtoMember(2)]
public string Email get; set;
[ProtoMember(3)]
public DateTime ResultTime get; set;
[MessageController]
public class Controller
public User Login(WebSocketReceiveArgs e, User user)
user.ResultTime = DateTime.Now;
return user;
class Program
static void Main(string[] args)
BeetleX.FastHttpApi.Hosting.HttpServer server = new BeetleX.FastHttpApi.Hosting.HttpServer(80);
server.Setting((service, options) =>
options.LogLevel = BeetleX.EventArgs.LogType.Trace;
options.LogToConsole = true;
options.ManageApiEnabled = false;
options.WebSocketFrameSerializer = new ProtobufFrameSerializer();
);
server.Completed(http =>
http.RegisterProtobuf<User>();
http.UseProtobufController();
);
server.Run();
客户端
如果你希望在.Net中使用Websocket客户端,BeetleX同样也提供一个扩展组件BeetleX.Http.Clients,通过这组件可以轻易封装一个Protobuf的Websocket客户端。
public class ProtobufClient : BinaryClient
public ProtobufClient(string host) : base(host)
public static BinaryDataFactory BinaryDataFactory get; private set; = new BinaryDataFactory();
protected override object OnDeserializeObject(ArraySegment<byte> data)
return BinaryDataFactory.Deserialize(data);
protected override ArraySegment<byte> OnSerializeObject(object data)
return BinaryDataFactory.Serializable(data);
封装完成后就可以使用了
class Program
static async Task Main(string[] args)
ProtobufClient.BinaryDataFactory.RegisterComponent<User>();
var client = new ProtobufClient("ws://localhost");
User user = new User Email="henryfan@msn.com", Name="henryfan" ;
var result = await client.ReceiveFrom<User>(user);
Console.WriteLine(result.Name);
完整示例代码可以访问
https://github.com/beetlex-io/BeetleX-Samples/tree/master/Websocket.ProtobufPacket
https://github.com/beetlex-io/BeetleX-Samples/tree/master/Websocket.MessagePackPacket
BeetleX
开源跨平台通讯框架(支持TLS)
提供高性能服务和大数据处理解决方案
https://beetlex-io.com
以上是关于BeetleX实现MessagePack和Protobuf消息控制器调用websocket服务详解的主要内容,如果未能解决你的问题,请参考以下文章