企业级工作流解决方案--微服务消息处理模型之客户端端处理
Posted spritekuang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了企业级工作流解决方案--微服务消息处理模型之客户端端处理相关的知识,希望对你有一定的参考价值。
微服务的服务端已经启动起来了,服务消费者怎么知道服务在哪个地方,通过什么方式调用呢,分布式如何选择正确的服务器调用服务?
这个就涉及到服务发现、服务健康检查的问题了,很多微服务架构的做法都是通过消息队列来实现的,消息队列天生就支持发布订阅功能,服务有变化之后,发布通知,每个消费者更新状态,还涉及到更新服务的metadata信息,同时还涉及到服务健康检查等等一系列功能,其实这些地方是非常容易出问题的地方,但是对于规模流量不是特别巨大的企业,这部分职责可以进行转移,服务的发现就直接通过配置文件实现,服务的寻址和健康检查就通过nginx来实现。等企业足够强大的时候,再来补全这部分内容,但对于微服务Rpc调用过程来说,没有任何影响。
客户端配置
客户端配置文件,格式如下:
<!--MessageCode(Json = 1,MessagePack = 2,ProtoBuffer = 3)--> <SocketClientConfiguration MessageCode="MessagePack"> <ClientConnectServerInfos> <!--Url(Http连接服务方式为Url地址,Tcp连接方式为:IP:Port)、ConnectServerType(Tcp = 1,Http = 2,Local = 3)、ServerName(服务名称)--> <!--<ClientConnectServerInfo ServerName="AuthCenterService" ConnectServerType="Tcp" Url="127.0.0.1:1314"></ClientConnectServerInfo>--> <ClientConnectServerInfo ServerName="AuthCenterService" ConnectServerType="Http" Url="http://localhost:9527/json.rpc"></ClientConnectServerInfo> <ClientConnectServerInfo ServerName="FlowDefineService" ConnectServerType="Tcp" Url="127.0.0.1:2019"></ClientConnectServerInfo> <ClientConnectServerInfo ServerName="WorkflowRuntimeService" ConnectServerType="Tcp" Url="127.0.0.1:2019"></ClientConnectServerInfo> </ClientConnectServerInfos> </SocketClientConfiguration>
ServerName即服务名称,与之前文章介绍的服务名称必须一致,ConnectServerType即传输方式,如果传输方式为Local的,可以不在配置文件里面配置,系统自动在本地服务中查找服务调用,Url为服务地址,Tcp方式为”IP地址:端口”,客户端启动的时候,会读取配置文件,存储到全局配置里面,代码如下:
[DependsOn(typeof(AbpKernelModule))] public class JsonRpcClientModule : AbpModule { public override void PreInitialize() { // 注册客户端配置,固定从Xml文件读取 SocketClientConfiguration socketClientConfiguration = XmlConfigProvider.GetConfig<SocketClientConfiguration>("SocketClientConfiguration.xml"); IocManager.IocContainer.Register( Component .For<ISocketClientConfiguration>() .Instance(socketClientConfiguration) ); switch (socketClientConfiguration.MessageCode) { case EMessageCode.Json: IocManager.RegisterIfNot<ITransportMessageCodecFactory, JsonTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton); break; case EMessageCode.MessagePack: IocManager.RegisterIfNot<ITransportMessageCodecFactory, MessagePackTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton); break; case EMessageCode.ProtoBuffer: IocManager.RegisterIfNot<ITransportMessageCodecFactory, ProtoBufferTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton); break; } } public override void Initialize() { IocManager.RegisterAssemblyByConvention(typeof(JsonRpcClientModule).GetAssembly()); var dotNettyTransportClientFactory = new DotNettyTransportClientFactory(IocManager.Resolve<ITransportMessageCodecFactory>(), Logger); IocManager.IocContainer.Register( Component .For<ITransportClientFactory>() .Instance(dotNettyTransportClientFactory) ); } public override void PostInitialize() { var socketClientConfiguration = Configuration.Modules.RpcClientConfig(); var transportClientFactory = IocManager.Resolve<ITransportClientFactory>(); try { foreach (var clientConnectServerInfo in socketClientConfiguration.ClientConnectServerInfos) // 预连接 { if (clientConnectServerInfo.ConnectServerType == EConnectServerType.Tcp) { var tcpAddress = clientConnectServerInfo.Url.Split(new char[] { ‘:‘ }, StringSplitOptions.RemoveEmptyEntries); transportClientFactory.CreateClient(new IpAddressModel(tcpAddress[0], int.Parse(tcpAddress[1])).CreateEndPoint()); } } } catch(Exception ex) // 预连,出错不处理 { } } }
客户端调用
客户端调用示例:var loginResult = Rpc.Call<AuthenticateResultModel>("AuthCenterService.AuthCenterServiceAppService.Authenticate", userId, password);
代码实现如下:
/// <summary> /// 调用JSON-RPC服务(服务地址需要配置在App.config中的Setting下的JsonRpcServiceUrl的Key值)。 /// </summary> /// <typeparam name="T">JSON-RPC方法返回的Result的对象类型。</typeparam> /// <param name="method">JSON-RPC方法名。</param> /// <param name="args">JSON-RPC方法接收的参数,此参数为可变数组</param> /// <returns></returns> public static T Call<T>(string method, params object[] args) { var rpcInvoker = GetRpcInvoker(method, null); var jresp = rpcInvoker.Invoke<T>(method, args); if (jresp.Error != null) throw jresp.Error; return jresp.Result; } private static RpcInvoker GetRpcInvoker(string method, RpcOption option) { var socketClientConfiguration = Dependency.IocManager.Instance.Resolve<ISocketClientConfiguration>(); var methodInfos = method.Split(new char[] { ‘.‘ }); var callConfiguration = socketClientConfiguration.ClientConnectServerInfos.FirstOrDefault(r => r.ServerName == methodInfos[0]); RpcInvoker invoker = null; if(callConfiguration == null) { invoker = new LocalRpcInvoker(); } else { switch (callConfiguration.ConnectServerType) { case EConnectServerType.Tcp: invoker = new TcpRpcInvoker(); break; case EConnectServerType.Http: invoker = new HttpRpcInvoker(); break; case EConnectServerType.Local: invoker = new LocalRpcInvoker(); break; default: break; } invoker.ServiceAddress = callConfiguration.Url; } invoker.Option = option; return invoker; } }
三种传输方式的Invoker处理逻辑有所不同,LocalInvoker直接调用服务端处理程序处理,HttpInvoker则构造Http请求,TcpInvoker则发起Tcp调用过程传输消息,三种过程都需要处理请求Header。
internal class LocalRpcInvoker : RpcInvoker { internal override JsonResponse<T> Invoke<T>(JsonRequest jsonRpc) { int myId; if (jsonRpc.Id == null) { lock (idLock) { myId = ++id; } jsonRpc.Id = myId.ToString(); } var jsonReqStr = JsonConvert.SerializeObject(jsonRpc); var contextNameValueCollection = new NameValueCollection(); Rpc.GlobalContextSet?.Invoke(contextNameValueCollection); MergeContextValues(contextNameValueCollection, this.Option); var abpSession = Dependency.IocManager.Instance.Resolve<IAbpSession>(); if (!string.IsNullOrEmpty(abpSession.AccessToken)) // 将用户Token转递到其他微服务 { contextNameValueCollection.Add("AccessToken", "1"); contextNameValueCollection.Add("UserId", abpSession.UserId?.ToString()); contextNameValueCollection.Add("UserName", abpSession.UserName); contextNameValueCollection.Add("TenantId", abpSession.TenantId?.ToString()); contextNameValueCollection.Add("RoleIds", abpSession.RoleIds); } var jsonRespStr = LocalRpcRun(jsonReqStr, contextNameValueCollection); JsonResponse<T> rjson = JsonConvert.DeserializeObject<JsonResponse<T>>(jsonRespStr); if (rjson == null) { if (!string.IsNullOrEmpty(jsonRespStr)) { JObject jo = JsonConvert.DeserializeObject(jsonRespStr) as JObject; throw new Exception(jo["Error"].ToString()); } else { throw new Exception("Empty response"); } } return rjson; } private static string LocalRpcRun(string jsonReqStr, NameValueCollection contextNameValueCollection) { return JsonRpcProcessor.Process(jsonReqStr, contextNameValueCollection).Result; } } internal class HttpRpcInvoker : RpcInvoker { public static bool EnabledGzip = true; private static Stream CopyAndClose(Stream inputStream) { const int readSize = 4096; byte[] buffer = new byte[readSize]; MemoryStream ms = new MemoryStream(); int count = inputStream.Read(buffer, 0, readSize); while (count > 0) { ms.Write(buffer, 0, count); count = inputStream.Read(buffer, 0, readSize); } ms.Position = 0; inputStream.Close(); return ms; } internal override JsonResponse<T> Invoke<T>(JsonRequest jsonRpc) { HttpWebRequest req = null; int myId; if (jsonRpc.Id == null) { lock (idLock) { myId = ++id; } jsonRpc.Id = myId.ToString(); } req = WebRequest.Create(new Uri(ServiceAddress + "?callid=" + jsonRpc.Id.ToString() + "&method=" + jsonRpc.Method)) as HttpWebRequest; req.KeepAlive = false; req.Proxy = null; req.Method = "Post"; req.ContentType = "application/json-rpc"; if (Rpc.GlobalContextSet != null) Rpc.GlobalContextSet(req.Headers); var accessToken = Dependency.IocManager.Instance.Resolve<IAbpSession>().AccessToken; if(!string.IsNullOrEmpty(accessToken)) // 将用户Token转递到其他微服务 { req.Headers.Add("Authorization", accessToken); } if(!string.IsNullOrEmpty(Rpc.GlobalAccessToken)) { req.Headers.Add("Authorization", "Bearer " + Rpc.GlobalAccessToken); } MergeContextValues(req.Headers, this.Option); if (this.Option != null && this.Option.Timeout > 0) { req.Timeout = this.Option.Timeout; } else { req.Timeout = 400000; } req.ReadWriteTimeout = req.Timeout; if (EnabledGzip) { req.Headers["Accept-Encoding"] = "gzip"; } var stream = new StreamWriter(req.GetRequestStream()); var json = Newtonsoft.Json.JsonConvert.SerializeObject(jsonRpc); stream.Write(json); stream.Close(); var resp = req.GetResponse(); string sstream; string contentEncoding = resp.Headers["Content-Encoding"]; if (contentEncoding != null && contentEncoding.Contains("gzip")) { var mstream = CopyAndClose(resp.GetResponseStream()); using (var gstream = new GZipStream(mstream, CompressionMode.Decompress)) { using (var reader = new StreamReader(gstream, UTF8Encoding)) { sstream = reader.ReadToEnd(); } } } else { using (var rstream = new StreamReader(CopyAndClose(resp.GetResponseStream()))) { sstream = rstream.ReadToEnd(); } } resp.Close(); JsonResponse<T> rjson = Newtonsoft.Json.JsonConvert.DeserializeObject<JsonResponse<T>>(sstream); if (rjson == null) { if (!string.IsNullOrEmpty(sstream)) { JObject jo = Newtonsoft.Json.JsonConvert.DeserializeObject(sstream) as JObject; throw new Exception(jo["Error"].ToString()); } else { throw new Exception("Empty response"); } } return rjson; } } internal class TcpRpcInvoker : RpcInvoker { internal override JsonResponse<T> Invoke<T>(JsonRequest jsonRpc) { int myId; if (jsonRpc.Id == null) { lock (idLock) { myId = ++id; } jsonRpc.Id = myId.ToString(); } var jsonReqStr = JsonConvert.SerializeObject(jsonRpc); var contextNameValueCollection = new NameValueCollection(); Rpc.GlobalContextSet?.Invoke(contextNameValueCollection); MergeContextValues(contextNameValueCollection, this.Option); var abpSession = Dependency.IocManager.Instance.Resolve<IAbpSession>(); if (!string.IsNullOrEmpty(abpSession.AccessToken)) // 将用户Token转递到其他微服务 { contextNameValueCollection.Add("AccessToken", "1"); contextNameValueCollection.Add("UserId", abpSession.UserId?.ToString()); contextNameValueCollection.Add("UserName", abpSession.UserName); contextNameValueCollection.Add("TenantId", abpSession.TenantId?.ToString()); contextNameValueCollection.Add("RoleIds", abpSession.RoleIds); } var jsonRespStr = LocalRpcRun(jsonReqStr, contextNameValueCollection,this.ServiceAddress); JsonResponse<T> rjson = JsonConvert.DeserializeObject<JsonResponse<T>>(jsonRespStr); if (rjson == null) { if (!string.IsNullOrEmpty(jsonRespStr)) { JObject jo = JsonConvert.DeserializeObject(jsonRespStr) as JObject; throw new Exception(jo["Error"].ToString()); } else { throw new Exception("Empty response"); } } return rjson; } private static string LocalRpcRun(string jsonReqStr, NameValueCollection contextNameValueCollection, string serverAddress) { var transportClientFactory = Dependency.IocManager.Instance.Resolve<ITransportClientFactory>(); var req = JsonConvert.DeserializeObject<JsonRequest>(jsonReqStr); var tcpAddress = serverAddress.Split(new char[] { ‘:‘ }, StringSplitOptions.RemoveEmptyEntries); var result = transportClientFactory.CreateClient(new IpAddressModel(tcpAddress[0], int.Parse(tcpAddress[1])).CreateEndPoint())
.SendAsync(req, contextNameValueCollection, Task.Factory.CancellationToken).Result; return JsonConvert.SerializeObject(result); } }
以上是关于企业级工作流解决方案--微服务消息处理模型之客户端端处理的主要内容,如果未能解决你的问题,请参考以下文章
企业级工作流解决方案--微服务Tcp消息传输模型之服务端处理