企业级工作流解决方案--微服务Tcp消息传输模型之消息编解码

Posted spritekuang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了企业级工作流解决方案--微服务Tcp消息传输模型之消息编解码相关的知识,希望对你有一定的参考价值。

  Tcp消息传输主要参照surging来做的,做了部分裁剪和改动,详细参见:https://github.com/dotnetcore/surging

  Json-rpc没有定义消息如何传输,因此,Json-Rpc RpcRequest对象和RpcResponse对象需要一个传输载体,这里的传输对象主是TransportMessage,如下代码,这里的Content请求时为RcpRequest对象,答复时为RpcResponse对象,答复时Header一般情况下为空。

/// <summary>
    /// 传输消息模型。
    /// </summary>
    public class TransportMessage
    {

        public TransportMessage()
        {
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public TransportMessage(object content,object headers)
        {
            if (content == null)
                throw new ArgumentNullException(nameof(content));

            Content = content;
            Headers = headers;
            ContentType = content.GetType().FullName;
        }
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public TransportMessage(object content, object headers, string fullName)
        {
            if (content == null)
                throw new ArgumentNullException(nameof(content));

            Headers = headers;
            Content = content;
            ContentType = fullName;
        }

        /// <summary>
        /// 消息Id。
        /// </summary>
        public string Id { get; set; }

        /// <summary>
        /// 消息内容。
        /// </summary>
        public object Content { get; set; }

        /// <summary>
        /// 消息传输Header
        /// </summary>
        public object Headers { get; set; }

        /// <summary>
        /// 内容类型。
        /// </summary>
        public string ContentType { get; set; }

        /// <summary>
        /// 是否调用消息。
        /// </summary>
        /// <returns>如果是则返回true,否则返回false。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool IsInvokeMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;
        }

        /// <summary>
        /// 是否是调用结果消息。
        /// </summary>
        /// <returns>如果是则返回true,否则返回false。</returns>
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool IsInvokeResultMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;
        }

        /// <summary>
        /// 获取内容。
        /// </summary>
        /// <typeparam name="T">内容类型。</typeparam>
        /// <returns>内容实例。</returns> 
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public T GetContent<T>()
        {
            return (T)Content;
        }

        /// <summary>
        /// 获取Header。
        /// </summary>
        /// <typeparam name="T">Header类型。</typeparam>
        /// <returns>Header实例。</returns> 
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public T GetHeaders<T>()
        {
            return (T)Headers;
        }

        /// <summary>
        /// 创建一个调用传输消息。
        /// </summary>
        /// <param name="invokeMessage">调用实例。</param>
        /// <returns>调用传输消息。</returns>  
        public static TransportMessage CreateInvokeMessage(JsonRequest invokeMessage,NameValueCollection nameValueCollection)
        {
            return new TransportMessage(invokeMessage, nameValueCollection, MessagePackTransportMessageType.jsonRequestTypeName)
            {
                Id = Guid.NewGuid().ToString("N")
            };
        }

        /// <summary>
        /// 创建一个调用结果传输消息。
        /// </summary>
        /// <param name="id">消息Id。</param>
        /// <param name="invokeResultMessage">调用结果实例。</param>
        /// <returns>调用结果传输消息。</returns>  
        public static TransportMessage CreateInvokeResultMessage(string id, JsonResponse jsonResponse,NameValueCollection nameValueCollection)
        {
            return new TransportMessage(jsonResponse, nameValueCollection, MessagePackTransportMessageType.jsonResponseTypeName)
            {
                Id = id
            };
        }
    }

  TransportMessage需要在dotnetty中传输,则需要对TransportMessage进行编码解码

消息编解码器

public interface ITransportMessageEncoder
    {
        byte[] Encode(TransportMessage message);
}
public interface ITransportMessageDecoder
    {
        TransportMessage Decode(byte[] data);
    }

Json编解码

  平时编码中经常用的方式

public sealed class JsonTransportMessageEncoder : ITransportMessageEncoder
    {
        #region Implementation of ITransportMessageEncoder

        public byte[] Encode(TransportMessage message)
        {
            var content = JsonConvert.SerializeObject(message);
            return Encoding.UTF8.GetBytes(content);
        }

        #endregion Implementation of ITransportMessageEncoder
}

public sealed class JsonTransportMessageDecoder : ITransportMessageDecoder
    {
        #region Implementation of ITransportMessageDecoder

        public TransportMessage Decode(byte[] data)
        {
            var content = Encoding.UTF8.GetString(data);
            var message = JsonConvert.DeserializeObject<TransportMessage>(content);
            if (message.IsInvokeMessage())
            {
                message.Content = JsonConvert.DeserializeObject<JsonRequest>(message.Content.ToString());
            }
            if (message.IsInvokeResultMessage())
            {
                message.Content = JsonConvert.DeserializeObject<JsonResponse>(message.Content.ToString());
            }
            return message;
        }

        #endregion Implementation of ITransportMessageDecoder
    }

MessagePack

  官网地址:https://msgpack.org/

  贴出代码,不过多的解释

[MessagePackObject]
    public class MessagePackTransportMessage
    {
        public MessagePackTransportMessage(TransportMessage transportMessage)
        {
            Id = transportMessage.Id;
            ContentType = transportMessage.ContentType;

            object contentObject;
            if (transportMessage.IsInvokeMessage())
            {
                contentObject = new MessagePackJsonRequest(transportMessage.GetContent<JsonRequest>());
            }
            else if (transportMessage.IsInvokeResultMessage())
            {
                contentObject = new MessagePackJsonResponse(transportMessage.GetContent<JsonResponse>());
            }
            else
            {
                throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
            }

            Content = SerializerUtilitys.Serialize(contentObject);

            var headersObject = transportMessage.GetHeaders<NameValueCollection>();

            Headers = SerializerUtilitys.Serialize(JsonConvert.SerializeObject(MessagePackTransportMessageType.NvcToDictionary(headersObject)));
        }

        public MessagePackTransportMessage()
        {
        }

        [Key(0)]
        public string Id { get; set; }

        [Key(1)]
        public byte[] Content { get; set; }

        [Key(2)]
        public byte[] Headers { get; set; }

        [Key(3)]
        public string ContentType { get; set; }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool IsInvokeMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;
        }

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public bool IsInvokeResultMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;
        }

        public TransportMessage GetTransportMessage()
        {
            var message = new TransportMessage
            {
                ContentType = ContentType,
                Id = Id,
                Content = null,
                Headers = null,
            };

            object contentObject;
            if (IsInvokeMessage())
            {
                contentObject =
                    SerializerUtilitys.Deserialize<MessagePackJsonRequest>(Content).GetJsonRequest();
            }
            else if (IsInvokeResultMessage())
            {
                contentObject =
                    SerializerUtilitys.Deserialize<MessagePackJsonResponse>(Content)
                        .GetJsonResponse();
            }
            else
            {
                throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
            }
            message.Content = contentObject;
            var headers = SerializerUtilitys.Deserialize<string>(Headers);
            message.Headers = JsonConvert.DeserializeObject(headers);
            return message;
        }
}

public sealed class MessagePackTransportMessageEncoder:ITransportMessageEncoder
    {
        #region Implementation of ITransportMessageEncoder

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public byte[] Encode(TransportMessage message)
        {
            var transportMessage = new MessagePackTransportMessage(message)
            {
                Id = message.Id,
                ContentType = message.ContentType,
            };
            return SerializerUtilitys.Serialize(transportMessage);
        }
        #endregion Implementation of ITransportMessageEncoder
    }

public sealed class MessagePackTransportMessageDecoder : ITransportMessageDecoder
    {
        #region Implementation of ITransportMessageDecoder

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public TransportMessage Decode(byte[] data)
        {
            var message = SerializerUtilitys.Deserialize<MessagePackTransportMessage>(data);
            return message.GetTransportMessage();
        }

        #endregion Implementation of ITransportMessageDecoder
    }

ProtoBuffer

  这个应该听得比较多

[ProtoContract]
    public class ProtoBufferTransportMessage
    {
        public ProtoBufferTransportMessage(TransportMessage transportMessage)
        {
            Id = transportMessage.Id;
            ContentType = transportMessage.ContentType;

            object contentObject;
            if (transportMessage.IsInvokeMessage())
            {
                contentObject = new ProtoBufferJsonRequest(transportMessage.GetContent<JsonRequest>());
            }
            else if (transportMessage.IsInvokeResultMessage())
            {
                contentObject = new ProtoBufferJsonResponse(transportMessage.GetContent<JsonResponse>());
            }
            else
            {
                throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
            }

            Content = SerializerUtilitys.Serialize(contentObject);
            Headers = SerializerUtilitys.Serialize(transportMessage.GetHeaders<NameValueCollection>());
        }

        public ProtoBufferTransportMessage()
        {
        }
        
        [ProtoMember(1)]
        public string Id { get; set; }
        
        [ProtoMember(2)]
        public byte[] Content { get; set; }

        [ProtoMember(3)]
        public byte[] Headers { get; set; }

        [ProtoMember(4)]
        public string ContentType { get; set; }

    
        public bool IsInvokeMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonRequestTypeName;
        }

        public bool IsInvokeResultMessage()
        {
            return ContentType == MessagePackTransportMessageType.jsonResponseTypeName;
        }

        public TransportMessage GetTransportMessage()
        {
            var message = new TransportMessage
            {
                ContentType = ContentType,
                Id = Id,
                Content = null,
                Headers = null,
            };

            object contentObject;
            if (IsInvokeMessage())
            {
                contentObject =
                    SerializerUtilitys.Deserialize<ProtoBufferJsonRequest>(Content).GetJsonRequest();
            }
            else if (IsInvokeResultMessage())
            {
                contentObject =
                    SerializerUtilitys.Deserialize<ProtoBufferJsonResponse>(Content)
                        .GetJsonResponse();
            }
            else
            {
                throw new NotSupportedException($"无法支持的消息类型:{ContentType}!");
            }

            message.Content = contentObject;
            message.Headers = SerializerUtilitys.Deserialize<NameValueCollection>(Headers);

            return message;
        }
}

public sealed class ProtoBufferTransportMessageEncoder : ITransportMessageEncoder
    {
        #region Implementation of ITransportMessageEncoder

        public byte[] Encode(TransportMessage message)
        {
            var transportMessage = new ProtoBufferTransportMessage(message)
            {
                Id = message.Id,
                ContentType = message.ContentType,
            };

            return SerializerUtilitys.Serialize(transportMessage);
        }

        #endregion Implementation of ITransportMessageEncoder
    }

public sealed class ProtoBufferTransportMessageDecoder : ITransportMessageDecoder
    {
        #region Implementation of ITransportMessageDecoder

        public TransportMessage Decode(byte[] data)
        {
            var message = SerializerUtilitys.Deserialize<ProtoBufferTransportMessage>(data);
            return message.GetTransportMessage();
        }

        #endregion Implementation of ITransportMessageDecoder
    }

 

以上是关于企业级工作流解决方案--微服务Tcp消息传输模型之消息编解码的主要内容,如果未能解决你的问题,请参考以下文章

企业级工作流解决方案--微服务Tcp消息传输模型之客户端处理

企业级工作流解决方案--微服务消息处理模型之消息传输通道

企业级工作流解决方案--微服务消息处理模型之服务端处理

企业级工作流解决方案--微服务消息处理模型之客户端端处理

企业级工作流解决方案--微服务消息处理模型之与Abp集成

6OSI参考模型之“传输层”