企业级工作流解决方案--微服务消息处理模型之服务端处理
Posted spritekuang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了企业级工作流解决方案--微服务消息处理模型之服务端处理相关的知识,希望对你有一定的参考价值。
1. Json-Rpc 2.0
参考地址:https://www.jsonrpc.org/specification
JSON-RPC是一个无状态且轻量级的远程过程调用(RPC)协议,它允许运行在基于socket,http等诸多不同消息传输环境的同一进程中。其使用JSON(RFC 4627)作为数据格式,它为简单而生。
请求对象
发送一个请求对象至服务端代表一个rpc调用, 一个请求对象包含下列成员:
jsonrpc指定JSON-RPC协议版本的字符串,必须准确写为“2.0”
method包含所要调用方法名称的字符串,以rpc开头的方法名,用英文句号(U+002E or ASCII 46)连接的为预留给rpc内部的方法名及扩展名,且不能在其他地方使用。
params调用方法所需要的结构化参数值,该成员参数可以被省略。
id已建立客户端的唯一标识id,值必须包含一个字符串、数值或NULL空值。如果不包含该成员则被认定为是一个通知。该值一般不为NULL[1],若为数值则不应该包含小数[2]。
响应对象
当发起一个rpc调用时,除通知之外,服务端都必须回复响应。响应表示为一个JSON对象,使用以下成员:
jsonrpc指定JSON-RPC协议版本的字符串,必须准确写为“2.0”
result该成员在成功时必须包含。当调用方法引起错误时必须不包含该成员。服务端中的被调用方法决定了该成员的值。
error该成员在失败是必须包含。当没有引起错误的时必须不包含该成员。该成员参数值必须为5.1中定义的对象。
id该成员必须包含。该成员值必须于请求对象中的id成员值一致。若在检查请求对象id时错误(例如参数错误或无效请求),则该值必须为空值。
响应对象必须包含result或error成员,但两个成员必须不能同时包含。
错误对象
当一个rpc调用遇到错误时,返回的响应对象必须包含错误成员参数,并且为带有下列成员参数的对象:
code使用数值表示该异常的错误类型。 必须为整数。
message对该错误的简单描述字符串。 该描述应尽量限定在简短的一句话。
data包含关于错误附加信息的基本类型或结构化类型。该成员可忽略。 该成员值由服务端定义(例如详细的错误信息,嵌套的错误等)。
错误码及实例见官方文档。
2. 传输消息模型
Json-Rpc请求消息或者答复消息在服务消费者与服务提供者之间传输需要一个载体,传输消息模型定义如下:
/// <summary> /// 传输消息模型。 /// </summary> public class TransportMessage { public TransportMessage() { } [MethodImpl(MethodImplOptions.AggressiveInlining)] public TransportMessage(object content,object headers) { if (content == null) throw new ArgumentNullException(nameof(content)); Content = content; Headers = headers; ContentType = content.GetType().FullName; } [MethodImpl(MethodImplOptions.AggressiveInlining)] public TransportMessage(object content, object headers, string fullName) { if (content == null) throw new ArgumentNullException(nameof(content)); Headers = headers; Content = content; ContentType = fullName; } /// <summary> /// 消息Id。 /// </summary> public string Id { get; set; } /// <summary> /// 消息内容。 /// </summary> public object Content { get; set; } /// <summary> /// 消息传输Header /// </summary> public object Headers { get; set; } /// <summary> /// 内容类型。 /// </summary> public string ContentType { get; set; } /// <summary> /// 是否调用消息。 /// </summary> /// <returns>如果是则返回true,否则返回false。</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsInvokeMessage() { return ContentType == MessagePackTransportMessageType.jsonRequestTypeName; } /// <summary> /// 是否是调用结果消息。 /// </summary> /// <returns>如果是则返回true,否则返回false。</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)] public bool IsInvokeResultMessage() { return ContentType == MessagePackTransportMessageType.jsonResponseTypeName; } /// <summary> /// 获取内容。 /// </summary> /// <typeparam name="T">内容类型。</typeparam> /// <returns>内容实例。</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)] public T GetContent<T>() { return (T)Content; } /// <summary> /// 获取Header。 /// </summary> /// <typeparam name="T">Header类型。</typeparam> /// <returns>Header实例。</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)] public T GetHeaders<T>() { return (T)Headers; } /// <summary> /// 创建一个调用传输消息。 /// </summary> /// <param name="invokeMessage">调用实例。</param> /// <returns>调用传输消息。</returns> public static TransportMessage CreateInvokeMessage(JsonRequest invokeMessage,NameValueCollection nameValueCollection) { return new TransportMessage(invokeMessage, nameValueCollection, MessagePackTransportMessageType.jsonRequestTypeName) { Id = Guid.NewGuid().ToString("N") }; } /// <summary> /// 创建一个调用结果传输消息。 /// </summary> /// <param name="id">消息Id。</param> /// <param name="invokeResultMessage">调用结果实例。</param> /// <returns>调用结果传输消息。</returns> public static TransportMessage CreateInvokeResultMessage(string id, JsonResponse jsonResponse,NameValueCollection nameValueCollection) { return new TransportMessage(jsonResponse, nameValueCollection, MessagePackTransportMessageType.jsonResponseTypeName) { Id = id }; } }
这里的Content就是JsonRpc请求对象或者答复对象,重点说一下Id和Header,这里的Id定义为一个Guid,每一个请求消息给Id赋值Id = Guid.NewGuid().ToString("N"),每一次发起Rpc调用,消费者线程都会等待服务端消息答复,等待的标识就是这个消息Id,服务端收到Rpc调用请求的时候,新建答复消息,将请求消息Id赋值给答复消息Id,这样,服务消费者就可以拿到消息Id唤起对应的等待线程;这里的Header是一个数据字典,Json-Rpc标准没有定义Header,但是Header在很多情况下都是非常重要的,后续文章会介绍到在Abp用户身份认证中的作用。
3. 消息处理模型
方法启动时,把每个方法对应的方法处理模型存储到全局对象中
public class SMD { /// <summary> /// key:方法名称,value:smdservice /// </summary> public Dictionary<string, SMDService> SMDServices { get; set; } }
方法处理模型
public class SMDService { public Dictionary<string, Type> parameterDict { get; set; } public Type ApplicationType { get; private set; } public SMDResult returns { get; private set; } public SMDAdditionalParameters[] parameters { get; private set; } /// <summary> /// Stores default values for optional parameters. /// </summary> public ParameterDefaultValue[] defaultValues { get; private set; } }
ApplicationType为方法对应的服务类型,Rpc调用处理时,根据这个值从容器里面创建处理实例,parameters,returns,defaultValues为方法对应的参数,Rpc调用时,会根据这些值验证请求的合法性,并根据这些值创建正真调用的动态委托方法,执行正真的服务调用。
再见微服务交互流程图
4. 启动服务注册过程
服务启动注册的目的是要构造全局消息处理模型,最终需要解析为Dictionary<方法名称,服务处理模型>,这里的方法名称格式固定为“服务名称.类名称.方法名称”,服务启动依赖于服务配置文件,格式如下:
<jsonrpc> <serviceAssemblies> <add assembly="CK.Sprite.AuthCenterService" domain="AuthCenterService" methodMode="allPublic" /> <add assembly="CK.Sprite.CommonService" domain="CommonService" methodMode="allPublic" /> <add assembly="CK.Sprite.Workflow" domain="SpriteWorkflow" methodMode="allPublic" /> <add assembly="CK.Plugin.OAService" domain="OAService" methodMode="allPublic" /> </serviceAssemblies> </jsonrpc>
服务启动过程遍历此配置文件,对每一条记录,根据assembly获取运行时的System.Reflection. Assembly,遍历程序集里面的实现了IApplicationService接口的类,对于每一个类,通过反射获取每一个public方法,并解析方法名称和参数,最终构造出Dictionary<方法名称,服务处理模型>,核心代码如下:
private static void RegisterAssembly(XmlNode assemblyNode) { var assemblyName = assemblyNode.Attributes["assembly"].Value; var domain = assemblyNode.Attributes["domain"].Value; var methodMode = assemblyNode.Attributes["methodMode"].Value; //var assem = Assembly.Load(assemblyName); var assem = Dependency.IocManager.Instance.Resolve<IAssemblyFinder>().GetAllAssemblies().FirstOrDefault(r=> r.GetName().Name == assemblyName); if(assem == null) { return; } var typesWithHandlers = assem.GetTypes().Where(f => f.IsPublic && f.IsClass && typeof(IApplicationService).GetTypeInfo().IsAssignableFrom(f)); foreach (Type applicationServiceType in typesWithHandlers) { try { RegisterType(applicationServiceType, assem, methodMode, domain); } catch (Exception ex) { System.Diagnostics.Trace.WriteLine("Register type " + applicationServiceType.FullName + " error and skiped, " + ex.ToString()); } } } private static void RegisterType(Type applicationServiceType, Assembly assem, string methodMode, string domain) { string sessionID = Handler.DefaultSessionId(); var item = applicationServiceType; List<MethodInfo> methods; bool isMethodModeByAttribute = methodMode != null && "attribute".Equals(methodMode, StringComparison.InvariantCultureIgnoreCase); if (isMethodModeByAttribute) { methods = item.GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) .Where(m => m.GetCustomAttributes(typeof(JsonRpcMethodAttribute), false).Length > 0) .ToList(); } else { methods = item.GetMethods(BindingFlags.Public | BindingFlags.Instance | BindingFlags.DeclaredOnly).ToList(); } foreach (var meth in methods) { if (meth.Name.StartsWith("get_") || meth.Name.StartsWith("set_")) continue; Dictionary<string, Type> paras = new Dictionary<string, Type>(); Dictionary<string, object> defaultValues = new Dictionary<string, object>(); // dictionary that holds default values for optional params. var paramzs = meth.GetParameters(); List<Type> parameterTypeArray = new List<Type>(); for (int i = 0; i < paramzs.Length; i++) { // reflection attribute information for optional parameters paras.Add(paramzs[i].Name, paramzs[i].ParameterType); if (paramzs[i].IsOptional) // if the parameter is an optional, add the default value to our default values dictionary. defaultValues.Add(paramzs[i].Name, paramzs[i].DefaultValue); } var resType = meth.ReturnType; paras.Add("returns", resType); // add the return type to the generic parameters list. if (isMethodModeByAttribute) { var atdata = meth.GetCustomAttributes(typeof(JsonRpcMethodAttribute), false); foreach (JsonRpcMethodAttribute handlerAttribute in atdata) { string methodName; if (handlerAttribute.JsonMethodName == string.Empty) { methodName = applicationServiceType.Name + "." + meth.Name; if (!string.IsNullOrWhiteSpace(domain)) { methodName = domain + "." + methodName; } } else { methodName = handlerAttribute.JsonMethodName == string.Empty ? meth.Name : handlerAttribute.JsonMethodName; } var handlerSession = Handler.GetSessionHandler(sessionID); try { handlerSession.MetaData.AddService(methodName, new SMDService(applicationServiceType, paras, defaultValues)); } catch (Exception ex) { System.Diagnostics.Trace.WriteLine("Register method " + methodName + " error and skiped, " + ex.ToString()); } } } else { { var methodName = applicationServiceType.Name + "." + meth.Name; if (!string.IsNullOrWhiteSpace(domain)) { methodName = domain + "." + methodName; } var handlerSession = Handler.GetSessionHandler(sessionID); try { handlerSession.MetaData.AddService(methodName, new SMDService(applicationServiceType, paras, defaultValues)); } catch (Exception ex) { System.Diagnostics.Trace.WriteLine("Register method " + methodName + " error and skiped, " + ex.ToString()); } } } } }
5. 服务处理过程
首先根据请求参数获取消息处理模型,然后经过一系列的请求合法性验证,最终从容器中拿出Rpc调用正真的处理服务实例,动态构造调用委托,添加Abp的各种拦截(参照abp里面的actionfilter做法),动态调用方法委托,然后对结果进行rpcresponse封装。核心处理代码如下:
SMDService metadata = null; // 根据请求参数从全局消息处理模型中获取方法对应的消息处理模型 var haveMetadata = this.MetaData.SMDServices.TryGetValue(Rpc.Method, out metadata); // 请求合法性,消息参数,权限验证等一系列验证... // 从容器中获取服务类 var applicationServiceInstance = Dependency.IocManager.Instance.Resolve(metadata.ApplicationType); // 构造动态执行调用委托 var newDel = Delegate.CreateDelegate(System.Linq.Expressions.Expression.GetDelegateType(metadata.parameterDict.Values.ToArray()), applicationServiceInstance /*Need to add support for other methods outside of this instance*/, mothod.First()); // 添加abp uow拦截 var _unitOfWorkDefaultOptions = Dependency.IocManager.Instance.Resolve<IUnitOfWorkDefaultOptions>(); var _unitOfWorkManager = Dependency.IocManager.Instance.Resolve<IUnitOfWorkManager>(); var unitOfWorkAttr = _unitOfWorkDefaultOptions.GetUnitOfWorkAttributeOrNull(mothod.First()); object results; // 动态调用真正的服务方法 if (!unitOfWorkAttr.IsDisabled) { using (var uow = _unitOfWorkManager.Begin(unitOfWorkAttr.CreateOptions())) { results = newDel.DynamicInvoke(parameters); uow.Complete(); } } else { results = newDel.DynamicInvoke(parameters); } var last = parameters.LastOrDefault(); JsonRpcException contextException; if (Task.CurrentId.HasValue && RpcExceptions.TryRemove(Task.CurrentId.Value, out contextException)) { JsonResponse response = new JsonResponse() { Error = ProcessException(Rpc, contextException), Id = Rpc.Id }; callback.Invoke(response); CompletedProcess(Rpc, response, RpcContext); return response; } if (expectsRefException && last != null && last is JsonRpcException) { JsonResponse response = new JsonResponse() { Error = ProcessException(Rpc, last as JsonRpcException), Id = Rpc.Id }; callback.Invoke(response); CompletedProcess(Rpc, response, RpcContext); return response; } //return response, if callback is set (method is asynchronous) - result could be empty string and future result operations //will be processed in the callback var reponse = new JsonResponse() { Result = results }; CompletedProcess(Rpc, reponse, RpcContext); return reponse;
有问题可以QQ或者邮箱交流:邮箱:[email protected],QQ:523477776
以上是关于企业级工作流解决方案--微服务消息处理模型之服务端处理的主要内容,如果未能解决你的问题,请参考以下文章
企业级工作流解决方案--微服务Tcp消息传输模型之客户端处理