.net下的面向工控领域的远程方法调用(RMI)中间件,客户端协议栈应答端实现
Posted gongbenwen
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了.net下的面向工控领域的远程方法调用(RMI)中间件,客户端协议栈应答端实现相关的知识,希望对你有一定的参考价值。
远程方法调用,在服务器端执行完毕后,其反馈结果也必然是,按照字节流的方式返回,服务器端按照通信协议,做封包处理,而客户端的应答处理部分,从通信连接会话上,接收到待解析的字节流数据,负责解析,转化成客户端可执行的结构体数据。
解包过程时,需要从外到内逐级向下,上级解包状态决定了下级解包时,调用的消息处理器。
下面是同步调用过程的客户端协议拆包过程:
1 通信协议包格式
用于对客户端和服务器端通信时,数据传输过程中的数据包的封装处理,只负责具体的数据
包传输,此为整个封包的最外层包装。
起始标记位 |
包体长度 |
加密标记 |
数据包内容 |
4Byte |
Int32 |
1Byte |
字节流 |
用于标记数据包的开始, 0x02,0x03,0xFF,0xFF |
用于标记数据包的长度,LittleEndian |
0x00,不加密 0x01,加密 |
所有数据的内容,需要下面的“数据包内容协议格式定义”进一步处理 |
|
/// <summary> /// RMI应答消息解析器 /// </summary> public class RMIClientDataOutputFilter : Trace.Common.Communication.Message.BaseOutPutMessageFilter<RMIClientDataRequest, RMIClientDataResponse> { private IMessageFilterHelper messageFilterHelper = null; /// <summary> /// 包头 /// </summary> private byte[] start = new byte[] { 0x02, 0x03, 0xFF, 0xFF }; /// <summary> /// 应答处理上下文 /// </summary> private List<RMIResponseContext> responseContextList = new List<RMIResponseContext>(); /// <summary> /// 应答上下文处理锁 /// </summary> private object responseContextLock = new object(); public RMIClientDataOutputFilter() { messageFilterHelper = new MessageFilterFixStartLengthHelper(start, 4, 60000); } /// <summary> /// 将通信流压入待处理的解析器 /// </summary> /// <param name="sessionID">会话ID</param> /// <param name="buffer"></param> protected override void InputBufferData(string sessionID, byte[] buffer) { messageFilterHelper.InputBufferData(buffer); } /// <summary> /// 拆包 /// </summary> /// <returns></returns> protected override RMIClientDataResponse GetNextResponse() { byte[] tempBuffer = messageFilterHelper.GetNextData(); if (tempBuffer == null || tempBuffer.Length == 0) { return null; } RMIClientDataResponse response = new RMIClientDataResponse(); try { response = this.Response(tempBuffer, response); } catch (Exception ex) { response.Exception = ex; } return response; } /// <summary> /// 应答解析处理 /// </summary> /// <param name="buffer"></param> /// <param name="response"></param> /// <returns></returns> private RMIClientDataResponse Response(byte[] buffer, RMIClientDataResponse response) { PackDataOutputFilter packDataOutputFilter = new PackDataOutputFilter(); packDataOutputFilter.SetBuffer(buffer); if (packDataOutputFilter.RMIDataBlockFlag == RMIDataBlockFlag.AllBlock)//整包传输 { this.AllBlockDataPro(packDataOutputFilter); } else if (packDataOutputFilter.RMIDataBlockFlag == RMIDataBlockFlag.AllBlockACK || packDataOutputFilter.RMIDataBlockFlag == RMIDataBlockFlag.AllBlockNAK)//整包传输成功\失败确认 { this.AllBlockConfirm(packDataOutputFilter); } else if (packDataOutputFilter.RMIDataBlockFlag == RMIDataBlockFlag.SplitBlock)//分包传输 { } else if (packDataOutputFilter.RMIDataBlockFlag == RMIDataBlockFlag.SplitBlockACK || packDataOutputFilter.RMIDataBlockFlag == RMIDataBlockFlag.SplitBlockNAK)//分包传输成功、失败确认 { } return response; } /// <summary> /// 整包数据确认 /// </summary> /// <param name="packDataOutputFilter"></param> private void AllBlockConfirm(PackDataOutputFilter packDataOutputFilter) { AllBlockConfirmOutputFilter allBlockConfirmOutputFilter = new AllBlockConfirmOutputFilter(); allBlockConfirmOutputFilter.SetBuffer(packDataOutputFilter.LeftBuffer); if (packDataOutputFilter.RMIDataBlockFlag == RMIDataBlockFlag.AllBlockNAK)//整包传输失败确认 { } } /// <summary> /// 整包数据处理 /// </summary> /// <param name="packDataOutputFilter"></param> private void AllBlockDataPro(PackDataOutputFilter packDataOutputFilter) { AllBlockDataOutputFilter allBlockDataOutputFilter = new AllBlockDataOutputFilter(); allBlockDataOutputFilter.SetBuffer(packDataOutputFilter.LeftBuffer); if (allBlockDataOutputFilter.RMIVersion == RMIVersion.Version1)//版本1处理器 { this.ExcuteContentVersion1(allBlockDataOutputFilter.LeftBuffer); } } /// <summary> /// 版本1,执行体数据处理 /// </summary> /// <param name="excuteBuffer">执行体数据</param> private RMIResponseContext ExcuteContentVersion1(byte[] excuteBuffer) { RMIResponseContext context; ResponseDataOutputFilterVersion1 responseDataOutputFilterVersion1 = new ResponseDataOutputFilterVersion1(); responseDataOutputFilterVersion1.SetBuffer(excuteBuffer); if (responseDataOutputFilterVersion1.RMIResponseType == RMIResponseType.SyncRequestResponse)//同步应答 { context = new RMISyncResponseContext(); context.RequestObjCallID = responseDataOutputFilterVersion1.RequestObjCallID; context.RequestMethodID = responseDataOutputFilterVersion1.RequestMethodID; SyncResponseOutputFilterVersion1 syncResponseOutputFilterVersion1 = new SyncResponseOutputFilterVersion1(); syncResponseOutputFilterVersion1.SetBuffer(responseDataOutputFilterVersion1.LeftBuffer); context.ResponseStatus = syncResponseOutputFilterVersion1.ResponseStatus; if (syncResponseOutputFilterVersion1.ResponseStatus == RMIResponseStatus.IsResult)//同步应答返回结果 { SyncResponseOutputFilterVersion1IsResult syncResponseOutputFilterVersion1IsResult = new SyncResponseOutputFilterVersion1IsResult(responseDataOutputFilterVersion1.SerializeType); syncResponseOutputFilterVersion1IsResult.SetBuffer(syncResponseOutputFilterVersion1.LeftBuffer); } else if (syncResponseOutputFilterVersion1.ResponseStatus == RMIResponseStatus.IsNull)//空的返回 { SyncResponseOutputFilterVersion1IsNull syncResponseOutputFilterVersion1IsNull = new SyncResponseOutputFilterVersion1IsNull(); syncResponseOutputFilterVersion1IsNull.SetBuffer(syncResponseOutputFilterVersion1.LeftBuffer); } else if (syncResponseOutputFilterVersion1.ResponseStatus == RMIResponseStatus.Exception)//异常返回 { SyncResponseOutputFilterVersion1IsException syncResponseOutputFilterVersion1IsException = new SyncResponseOutputFilterVersion1IsException(responseDataOutputFilterVersion1.SerializeType); syncResponseOutputFilterVersion1IsException.SetBuffer(syncResponseOutputFilterVersion1.LeftBuffer); } } else if (responseDataOutputFilterVersion1.RMIResponseType == RMIResponseType.ASyncRequestResponse)//异步应答 {
1.1 数据包内容协议格式定义:
包唯一ID |
包标记 |
数据块内容 |
数据包校验位 |
GUI(16Byte) |
1Byte |
字节流 |
1Byte |
当前传输数据包的唯一ID |
0x00,表示整包传输, 0x01,表示分包传输 0x02,表示整包传输确认 0x03,表示分包传输确认 0x04,表示整包传输失败 0x05,表示分包传输失败 |
需要下面进一步处理 |
校验和,对数据包内容中的所有数据,进行校验和 |
|
1.1.1 整包传输数据块协议
版本号 |
命令状态 |
执行体 |
Int32 |
Byte |
字节流 |
用于标记处理数据的版本 |
0x00,请求命令 0x01,应答命令 |
需要到“执行体协议”进一步处理 |
|
/// <summary> /// 包体数据解析 /// </summary> public class PackDataOutputFilter : IRMISplitResponseOutputFilter { private Guid packID; /// <summary> /// 数据包唯一ID /// </summary> public Guid PackID { get { return packID; } } private RMIDataBlockFlag rMIDataBlockFlag; /// <summary> /// 包标记 /// </summary> public RMIDataBlockFlag RMIDataBlockFlag { get { return this.rMIDataBlockFlag; } } private byte[] leftBuffer; /// <summary> /// 剩余数据 /// </summary> public byte[] LeftBuffer { get { return this.leftBuffer; } } /// <summary> /// 包体数据解析 /// </summary> /// <param name="buffer"></param> public void SetBuffer(byte[] buffer) { this.CheckSum(buffer); Trace.Common.Communication.MessageStatck.MessageStatckSplit messageStatckSplit = new Trace.Common.Communication.MessageStatck.MessageStatckSplit(buffer); Guid tempPackID; if (messageStatckSplit.PopGUID(out tempPackID) == false) { throw new Exception("获取数据包唯一ID失败"); } this.packID = tempPackID; byte bRMIDataBlockFlag; if (messageStatckSplit.PopByte(out bRMIDataBlockFlag) == false) { throw new Exception("获取包标记失败"); } this.rMIDataBlockFlag = (RMIDataBlockFlag)bRMIDataBlockFlag; this.leftBuffer = messageStatckSplit.GetLeftDataBuffer(); } /// <summary> /// 进行校验和处理 /// </summary> /// <param name="dataBuffer">体数据</param> /// <returns></returns> public void CheckSum(byte[] dataBuffer) { int len = dataBuffer.Length; byte tempCheck = dataBuffer[len - 1]; byte checkSum = 0x00; for (int i = 0; i < len-1; i++) { checkSum += dataBuffer[i]; } if (tempCheck != checkSum) { throw new Exception("数据包校验失败"); } } }
/// <summary> /// 整数据块解析处理 /// </summary> public class AllBlockDataOutputFilter : IRMISplitResponseOutputFilter { private RMIVersion rmiVersion; /// <summary> /// 版本号 /// </summary> public RMIVersion RMIVersion { get { return rmiVersion; } } private RMICommand rmiCommand; /// <summary> /// 命令状态 /// </summary> public RMICommand RMICommand { get { return rmiCommand; } } private byte[] leftBuffer; /// <summary> /// 剩余数据 /// </summary> public byte[] LeftBuffer { get { return this.leftBuffer; } } /// <summary> /// 数据解析处理 /// </summary> /// <param name="buffer"></param> public void SetBuffer(byte[] buffer) { Trace.Common.Communication.MessageStatck.MessageStatckSplit messageStatckSplit = new Trace.Common.Communication.MessageStatck.MessageStatckSplit(buffer); Int32 versionCode; if (messageStatckSplit.PopInt32(out versionCode) == false) { throw new Exception("获取版本代码失败"); } this.rmiVersion = (RMIVersion)versionCode; byte bCommand; if (messageStatckSplit.PopByte(out bCommand) == false) { throw new Exception("获取命令状态失败"); } this.rmiCommand = (RMICommand)bCommand; this.leftBuffer = messageStatckSplit.GetLeftDataBuffer(); } }
1 服务端版本1,执行体协议
1.1 函数反馈应答
启用的序列化协议 |
客户端请求对象ID |
客户端调用过程ID |
应答方式 |
具体的应答处理 |
1Byte |
GUID(16Byte) |
GUID(16Byte) |
1Byte |
字节流 |
0x00,Json序列化, 0x01,ProtoBuf序列化 |
用于标记客户端发起请求的对象 |
用于标记客户端发起请求的调用函数 |
用于标记服务器端的应答方式, 0x01, 同步请求应答 0x02,订阅回调 0x03,异步请求应答
|
|
|
到执行体部分了,拆包协议处理,
/// <summary> /// 执行体数据消息解析器 /// </summary> public class ResponseDataOutputFilterVersion1:IRMISplitResponseOutputFilter { private RMISerializeType serializeType; /// <summary> /// 序列化方式 /// </summary> public RMISerializeType SerializeType { get { return serializeType; } } private Guid requestObjCallID; /// <summary> /// 原始请求对象ID /// </summary> public Guid RequestObjCallID { get { return requestObjCallID; } } private Guid requestMethodID; /// <summary> /// 原始请求方法ID /// </summary> public Guid RequestMethodID { get { return this.requestMethodID; } } private RMIResponseType responseType; /// <summary> /// 应答方式 /// </summary> public RMIResponseType RMIResponseType { get { return responseType; } } private byte[] leftBuffer; /// <summary> /// 剩余字节数量 /// </summary> public byte[] LeftBuffer { get { return leftBuffer; } } /// <summary> /// 执行体协议应答解析处理 /// </summary> /// <param name="buffer">执行体数据包</param> public void SetBuffer(byte[] buffer) { Trace.Common.Communication.MessageStatck.MessageStatckSplit messageStatckSplit = new Trace.Common.Communication.MessageStatck.MessageStatckSplit(buffer); byte bSerializeType; if (messageStatckSplit.PopByte(out bSerializeType) == false) { throw new Exception("获取启用的序列化协议失败"); } this.serializeType = (RMISerializeType)bSerializeType; Guid tempObjID; if (messageStatckSplit.PopGUID(out tempObjID) == false) { throw new Exception("获取请求对象ID失败"); } this.requestObjCallID = tempObjID; Guid tempMethodID; if (messageStatckSplit.PopGUID(out tempMethodID) == false) { throw new Exception("获取调用过程ID失败"); } this.requestMethodID = tempMethodID; byte bResponseType; if (messageStatckSplit.PopByte(out bResponseType) == false) { throw new Exception("获取应答方式失败"); } this.responseType = (RMIResponseType)bResponseType; this.leftBuffer = messageStatckSplit.GetLeftDataBuffer(); } }
1.1.1 客户端同步请求应答
应答状态 |
返回数据 |
1Byte |
字节流 |
用于标记应答的信息流状态 0x00 空的返回 0x01 存在数据 0x02 异常信息 |
|
/// <summary> /// 同步应答解析处理器 /// </summary> public class SyncResponseOutputFilterVersion1 : IRMISplitResponseOutputFilter { private RMIResponseStatus responseStatus=RMIResponseStatus.UnKnow; /// <summary> /// 应答状态 /// </summary> public RMIResponseStatus ResponseStatus { get { return responseStatus; } } private byte[] leftBuffer; /// <summary> /// 剩余字节数量 /// </summary> public byte[] LeftBuffer { get { return leftBuffer; } } /// <summary> /// 执行体协议应答解析处理 /// </summary> /// <param name="buffer">执行体数据包</param> public void SetBuffer(byte[] buffer) { Trace.Common.Communication.MessageStatck.MessageStatckSplit messageStatckSplit = new Trace.Common.Communication.MessageStatck.MessageStatckSplit(buffer); byte bResponseStatus; if (messageStatckSplit.PopByte(out bResponseStatus) == false) { throw new Exception("获取应答状态失败"); } this.responseStatus = (RMIResponseStatus)bResponseStatus; this.leftBuffer = messageStatckSplit.GetLeftDataBuffer(); } }
1.1.1.1 存在数据
返回类型全名字节流长度 |
返回类型全名字节流 |
返回序列化内容字节流长度 |
返回序列化内容字节流 |
Int32 |
字节流 |
Int32 |
字节流 |
用于标记返回类型全名字节流长度 |
用于描述返回类型全名对应的字符串,UTF8编码后的字节流 |
用于标记返回序列化内容字节流长度 |
将返回内容进行序列化后,并经UTF8编码后的字节流 |
/// <summary> /// 同步应答解析,反馈数据 /// </summary> public class SyncResponseOutputFilterVersion1IsResult : IRMISplitResponseOutputFilter { private RMISerializeType serializeType; /// <summary> /// 序列化方式 /// </summary> /// <param name="serializeType"></param> public SyncResponseOutputFilterVersion1IsResult(RMISerializeType serializeType) { this.serializeType = serializeType; } private object data; /// <summary> /// 反馈回的应答数据 /// </summary> public object Data { get { return data; } } /// <summary> /// 正常数据解析处理 /// </summary> /// <param name="buffer">执行体数据包</param> public void SetBuffer(byte[] buffer) { Trace.Common.Communication.MessageStatck.MessageStatckSplit messageStatckSplit = new Trace.Common.Communication.MessageStatck.MessageStatckSplit(buffer); IRMISerializble serialize = null; if (serializeType == RMISerializeType.Json) { serialize = new RMIJsonSerialize(); } int readPaTypeLen = 0; if (messageStatckSplit.PopInt32(out readPaTypeLen) == false) { throw new Exception("返回类型长度获取失败"); } string paTypeName; if (messageStatckSplit.PopUTF8String(out paTypeName, readPaTypeLen) == false) { throw new Exception("返回类型获取失败"); } Type paType = Type.GetType(paTypeName); int contentLength = 0; if (messageStatckSplit.PopInt32(out contentLength) == false) { throw new Exception("返回内容长度获取失败"); } if (contentLength > 0) { byte[] paBuffer = messageStatckSplit.PopBytes(contentLength); this.data = serialize.DeSerializeByUTF8(paType, paBuffer); } } }
以上是关于.net下的面向工控领域的远程方法调用(RMI)中间件,客户端协议栈应答端实现的主要内容,如果未能解决你的问题,请参考以下文章