企业级工作流解决方案--微服务Tcp消息传输模型之消息编解码
Posted spritekuang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了企业级工作流解决方案--微服务Tcp消息传输模型之消息编解码相关的知识,希望对你有一定的参考价值。
Tcp消息传输主要参照surging来做的,做了部分裁剪和改动,详细参见:https://github.com/dotnetcore/surging
Json-rpc没有定义消息如何传输,因此,Json-Rpc RpcRequest对象和RpcResponse对象需要一个传输载体,这里的传输对象主是TransportMessage,如下代码,这里的Content请求时为RcpRequest对象,答复时为RpcResponse对象,答复时Header一般情况下为空。
/// <summary> /// 传输消息模型。 /// </summary> public class TransportMessage { public TransportMessage() { } [MethodImpl(MethodImplOptions.AggressiveInlining)] public TransportMessage(object content,object headers) { if (content == null) throw new ArgumentNullException(nameof(content)); Content = content; Headers = headers; ContentType = content.GetType().FullName; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public TransportMessage(object content, object headers, string fullName) { if (content == null) throw new ArgumentNullException(nameof(content)); Headers = headers; Content = content; ContentType = fullName; } /// <summary> /// 消息Id。 /// </summary> public string Id { get; set; } /// <summary> /// 消息内容。 /// </summary> public object Content { get; set; } /// <summary> /// 消息传输Header /// </summary> public object Headers { get; set; } /// <summary> /// 内容类型。 /// </summary> public string ContentType { get; set; } /// <summary> /// 是否调用消息。 /// </summary> /// <returns>如果是则返回true,否则返回false。</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsInvokeMessage() { return ContentType == MessagePackTransportMessageType.jsonRequestTypeName; } /// <summary> /// 是否是调用结果消息。 /// </summary> /// <returns>如果是则返回true,否则返回false。</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsInvokeResultMessage() { return ContentType == MessagePackTransportMessageType.jsonResponseTypeName; } /// <summary> /// 获取内容。 /// </summary> /// <typeparam name="T">内容类型。</typeparam> /// <returns>内容实例。</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)] public T GetContent<T>() { return (T)Content; } /// <summary> /// 获取Header。 /// </summary> /// <typeparam name="T">Header类型。</typeparam> /// <returns>Header实例。</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)] public T GetHeaders<T>() { return (T)Headers; } /// <summary> /// 创建一个调用传输消息。 /// </summary> /// <param name="invokeMessage">调用实例。</param> /// <returns>调用传输消息。</returns> public static TransportMessage CreateInvokeMessage(JsonRequest invokeMessage,NameValueCollection nameValueCollection) { return new TransportMessage(invokeMessage, nameValueCollection, MessagePackTransportMessageType.jsonRequestTypeName) { Id = Guid.NewGuid().ToString("N") }; } /// <summary> /// 创建一个调用结果传输消息。 /// </summary> /// <param name="id">消息Id。</param> /// <param name="invokeResultMessage">调用结果实例。</param> /// <returns>调用结果传输消息。</returns> public static TransportMessage CreateInvokeResultMessage(string id, JsonResponse jsonResponse,NameValueCollection nameValueCollection) { return new TransportMessage(jsonResponse, nameValueCollection, MessagePackTransportMessageType.jsonResponseTypeName) { Id = id }; } }
TransportMessage需要在dotnetty中传输,则需要对TransportMessage进行编码解码
消息编解码器
public interface ITransportMessageEncoder { byte[] Encode(TransportMessage message); } public interface ITransportMessageDecoder { TransportMessage Decode(byte[] data); }
Json编解码
平时编码中经常用的方式
public sealed class JsonTransportMessageEncoder : ITransportMessageEncoder { #region Implementation of ITransportMessageEncoder public byte[] Encode(TransportMessage message) { var content = JsonConvert.SerializeObject(message); return Encoding.UTF8.GetBytes(content); } #endregion Implementation of ITransportMessageEncoder } public sealed class JsonTransportMessageDecoder : ITransportMessageDecoder { #region Implementation of ITransportMessageDecoder public TransportMessage Decode(byte[] data) { var content = Encoding.UTF8.GetString(data); var message = JsonConvert.DeserializeObject<TransportMessage>(content); if (message.IsInvokeMessage()) { message.Content = JsonConvert.DeserializeObject<JsonRequest>(message.Content.ToString()); } if (message.IsInvokeResultMessage()) { message.Content = JsonConvert.DeserializeObject<JsonResponse>(message.Content.ToString()); } return message; } #endregion Implementation of ITransportMessageDecoder }
MessagePack
官网地址:https://msgpack.org/
贴出代码,不过多的解释
[MessagePackObject] public class MessagePackTransportMessage { public MessagePackTransportMessage(TransportMessage transportMessage) { Id = transportMessage.Id; ContentType = transportMessage.ContentType; object contentObject; if (transportMessage.IsInvokeMessage()) { contentObject = new MessagePackJsonRequest(transportMessage.GetContent<JsonRequest>()); } else if (transportMessage.IsInvokeResultMessage()) { contentObject = new MessagePackJsonResponse(transportMessage.GetContent<JsonResponse>()); } else { throw new NotSupportedException($"无法支持的消息类型:{ContentType}!"); } Content = SerializerUtilitys.Serialize(contentObject); var headersObject = transportMessage.GetHeaders<NameValueCollection>(); Headers = SerializerUtilitys.Serialize(JsonConvert.SerializeObject(MessagePackTransportMessageType.NvcToDictionary(headersObject))); } public MessagePackTransportMessage() { } [Key(0)] public string Id { get; set; } [Key(1)] public byte[] Content { get; set; } [Key(2)] public byte[] Headers { get; set; } [Key(3)] public string ContentType { get; set; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsInvokeMessage() { return ContentType == MessagePackTransportMessageType.jsonRequestTypeName; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsInvokeResultMessage() { return ContentType == MessagePackTransportMessageType.jsonResponseTypeName; } public TransportMessage GetTransportMessage() { var message = new TransportMessage { ContentType = ContentType, Id = Id, Content = null, Headers = null, }; object contentObject; if (IsInvokeMessage()) { contentObject = SerializerUtilitys.Deserialize<MessagePackJsonRequest>(Content).GetJsonRequest(); } else if (IsInvokeResultMessage()) { contentObject = SerializerUtilitys.Deserialize<MessagePackJsonResponse>(Content) .GetJsonResponse(); } else { throw new NotSupportedException($"无法支持的消息类型:{ContentType}!"); } message.Content = contentObject; var headers = SerializerUtilitys.Deserialize<string>(Headers); message.Headers = JsonConvert.DeserializeObject(headers); return message; } } public sealed class MessagePackTransportMessageEncoder:ITransportMessageEncoder { #region Implementation of ITransportMessageEncoder [MethodImpl(MethodImplOptions.AggressiveInlining)] public byte[] Encode(TransportMessage message) { var transportMessage = new MessagePackTransportMessage(message) { Id = message.Id, ContentType = message.ContentType, }; return SerializerUtilitys.Serialize(transportMessage); } #endregion Implementation of ITransportMessageEncoder } public sealed class MessagePackTransportMessageDecoder : ITransportMessageDecoder { #region Implementation of ITransportMessageDecoder [MethodImpl(MethodImplOptions.AggressiveInlining)] public TransportMessage Decode(byte[] data) { var message = SerializerUtilitys.Deserialize<MessagePackTransportMessage>(data); return message.GetTransportMessage(); } #endregion Implementation of ITransportMessageDecoder }
ProtoBuffer
这个应该听得比较多
[ProtoContract] public class ProtoBufferTransportMessage { public ProtoBufferTransportMessage(TransportMessage transportMessage) { Id = transportMessage.Id; ContentType = transportMessage.ContentType; object contentObject; if (transportMessage.IsInvokeMessage()) { contentObject = new ProtoBufferJsonRequest(transportMessage.GetContent<JsonRequest>()); } else if (transportMessage.IsInvokeResultMessage()) { contentObject = new ProtoBufferJsonResponse(transportMessage.GetContent<JsonResponse>()); } else { throw new NotSupportedException($"无法支持的消息类型:{ContentType}!"); } Content = SerializerUtilitys.Serialize(contentObject); Headers = SerializerUtilitys.Serialize(transportMessage.GetHeaders<NameValueCollection>()); } public ProtoBufferTransportMessage() { } [ProtoMember(1)] public string Id { get; set; } [ProtoMember(2)] public byte[] Content { get; set; } [ProtoMember(3)] public byte[] Headers { get; set; } [ProtoMember(4)] public string ContentType { get; set; } public bool IsInvokeMessage() { return ContentType == MessagePackTransportMessageType.jsonRequestTypeName; } public bool IsInvokeResultMessage() { return ContentType == MessagePackTransportMessageType.jsonResponseTypeName; } public TransportMessage GetTransportMessage() { var message = new TransportMessage { ContentType = ContentType, Id = Id, Content = null, Headers = null, }; object contentObject; if (IsInvokeMessage()) { contentObject = SerializerUtilitys.Deserialize<ProtoBufferJsonRequest>(Content).GetJsonRequest(); } else if (IsInvokeResultMessage()) { contentObject = SerializerUtilitys.Deserialize<ProtoBufferJsonResponse>(Content) .GetJsonResponse(); } else { throw new NotSupportedException($"无法支持的消息类型:{ContentType}!"); } message.Content = contentObject; message.Headers = SerializerUtilitys.Deserialize<NameValueCollection>(Headers); return message; } } public sealed class ProtoBufferTransportMessageEncoder : ITransportMessageEncoder { #region Implementation of ITransportMessageEncoder public byte[] Encode(TransportMessage message) { var transportMessage = new ProtoBufferTransportMessage(message) { Id = message.Id, ContentType = message.ContentType, }; return SerializerUtilitys.Serialize(transportMessage); } #endregion Implementation of ITransportMessageEncoder } public sealed class ProtoBufferTransportMessageDecoder : ITransportMessageDecoder { #region Implementation of ITransportMessageDecoder public TransportMessage Decode(byte[] data) { var message = SerializerUtilitys.Deserialize<ProtoBufferTransportMessage>(data); return message.GetTransportMessage(); } #endregion Implementation of ITransportMessageDecoder }
以上是关于企业级工作流解决方案--微服务Tcp消息传输模型之消息编解码的主要内容,如果未能解决你的问题,请参考以下文章