企业级工作流解决方案--微服务消息处理模型之客户端端处理

Posted spritekuang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了企业级工作流解决方案--微服务消息处理模型之客户端端处理相关的知识,希望对你有一定的参考价值。

  微服务的服务端已经启动起来了,服务消费者怎么知道服务在哪个地方,通过什么方式调用呢,分布式如何选择正确的服务器调用服务?

  这个就涉及到服务发现、服务健康检查的问题了,很多微服务架构的做法都是通过消息队列来实现的,消息队列天生就支持发布订阅功能,服务有变化之后,发布通知,每个消费者更新状态,还涉及到更新服务的metadata信息,同时还涉及到服务健康检查等等一系列功能,其实这些地方是非常容易出问题的地方,但是对于规模流量不是特别巨大的企业,这部分职责可以进行转移,服务的发现就直接通过配置文件实现,服务的寻址和健康检查就通过nginx来实现。等企业足够强大的时候,再来补全这部分内容,但对于微服务Rpc调用过程来说,没有任何影响。

客户端配置

  客户端配置文件,格式如下:

<!--MessageCode(Json = 1,MessagePack = 2,ProtoBuffer = 3)-->
<SocketClientConfiguration MessageCode="MessagePack">
  <ClientConnectServerInfos>
    <!--Url(Http连接服务方式为Url地址,Tcp连接方式为:IP:Port)、ConnectServerType(Tcp = 1,Http = 2,Local = 3)、ServerName(服务名称)-->
    <!--<ClientConnectServerInfo ServerName="AuthCenterService" ConnectServerType="Tcp" Url="127.0.0.1:1314"></ClientConnectServerInfo>-->
    <ClientConnectServerInfo ServerName="AuthCenterService" ConnectServerType="Http" Url="http://localhost:9527/json.rpc"></ClientConnectServerInfo>
    <ClientConnectServerInfo ServerName="FlowDefineService" ConnectServerType="Tcp" Url="127.0.0.1:2019"></ClientConnectServerInfo>
    <ClientConnectServerInfo ServerName="WorkflowRuntimeService" ConnectServerType="Tcp" Url="127.0.0.1:2019"></ClientConnectServerInfo>
  </ClientConnectServerInfos>
</SocketClientConfiguration>

  ServerName即服务名称,与之前文章介绍的服务名称必须一致,ConnectServerType即传输方式,如果传输方式为Local的,可以不在配置文件里面配置,系统自动在本地服务中查找服务调用,Url为服务地址,Tcp方式为”IP地址:端口”,客户端启动的时候,会读取配置文件,存储到全局配置里面,代码如下:

[DependsOn(typeof(AbpKernelModule))]
    public class JsonRpcClientModule : AbpModule
    {
        public override void PreInitialize()
        {
            // 注册客户端配置,固定从Xml文件读取
            SocketClientConfiguration socketClientConfiguration = XmlConfigProvider.GetConfig<SocketClientConfiguration>("SocketClientConfiguration.xml");
            IocManager.IocContainer.Register(
                Component
                    .For<ISocketClientConfiguration>()
                    .Instance(socketClientConfiguration)
            );
            switch (socketClientConfiguration.MessageCode)
            {
                case EMessageCode.Json:
                    IocManager.RegisterIfNot<ITransportMessageCodecFactory, JsonTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                    break;
                case EMessageCode.MessagePack:
                    IocManager.RegisterIfNot<ITransportMessageCodecFactory, MessagePackTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                    break;
                case EMessageCode.ProtoBuffer:
                    IocManager.RegisterIfNot<ITransportMessageCodecFactory, ProtoBufferTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton);
                    break;
            }
        }

        public override void Initialize()
        {
            IocManager.RegisterAssemblyByConvention(typeof(JsonRpcClientModule).GetAssembly());
            var dotNettyTransportClientFactory = new DotNettyTransportClientFactory(IocManager.Resolve<ITransportMessageCodecFactory>(), Logger);

            IocManager.IocContainer.Register(
                Component
                    .For<ITransportClientFactory>()
                    .Instance(dotNettyTransportClientFactory)
            );
        }

        public override void PostInitialize()
        {
            var socketClientConfiguration = Configuration.Modules.RpcClientConfig();
            var transportClientFactory = IocManager.Resolve<ITransportClientFactory>();
            try
            {
                foreach (var clientConnectServerInfo in socketClientConfiguration.ClientConnectServerInfos) // 预连接
                {
                    if (clientConnectServerInfo.ConnectServerType == EConnectServerType.Tcp)
                    {
                        var tcpAddress = clientConnectServerInfo.Url.Split(new char[] { : }, StringSplitOptions.RemoveEmptyEntries);
                        transportClientFactory.CreateClient(new IpAddressModel(tcpAddress[0], int.Parse(tcpAddress[1])).CreateEndPoint());
                    }
                }
            }
            catch(Exception ex) // 预连,出错不处理
            {

            }
        }
    }

客户端调用

  客户端调用示例:var loginResult = Rpc.Call<AuthenticateResultModel>("AuthCenterService.AuthCenterServiceAppService.Authenticate", userId, password);

  代码实现如下:

/// <summary>
        /// 调用JSON-RPC服务(服务地址需要配置在App.config中的Setting下的JsonRpcServiceUrl的Key值)。
        /// </summary>
        /// <typeparam name="T">JSON-RPC方法返回的Result的对象类型。</typeparam>
        /// <param name="method">JSON-RPC方法名。</param>
        /// <param name="args">JSON-RPC方法接收的参数,此参数为可变数组</param>
        /// <returns></returns>
        public static T Call<T>(string method, params object[] args)
        {
            var rpcInvoker = GetRpcInvoker(method, null);
            var jresp = rpcInvoker.Invoke<T>(method, args);
            if (jresp.Error != null)
                throw jresp.Error;

            return jresp.Result;
        }

private static RpcInvoker GetRpcInvoker(string method, RpcOption option)
        {
            var socketClientConfiguration = Dependency.IocManager.Instance.Resolve<ISocketClientConfiguration>();
            var methodInfos = method.Split(new char[] { . });
            var callConfiguration = socketClientConfiguration.ClientConnectServerInfos.FirstOrDefault(r => r.ServerName == methodInfos[0]);

            RpcInvoker invoker = null;
            if(callConfiguration == null)
            {
                invoker = new LocalRpcInvoker();
            }
            else
            {
                switch (callConfiguration.ConnectServerType)
                {
                    case EConnectServerType.Tcp:
                        invoker = new TcpRpcInvoker();
                        break;
                    case EConnectServerType.Http:
                        invoker = new HttpRpcInvoker();
                        break;
                    case EConnectServerType.Local:
                        invoker = new LocalRpcInvoker();
                        break;
                    default:
                        break;
                }
                invoker.ServiceAddress = callConfiguration.Url;
            }

            invoker.Option = option;
            return invoker;
        }
    }

  三种传输方式的Invoker处理逻辑有所不同,LocalInvoker直接调用服务端处理程序处理,HttpInvoker则构造Http请求,TcpInvoker则发起Tcp调用过程传输消息,三种过程都需要处理请求Header。

internal class LocalRpcInvoker : RpcInvoker
    {
        internal override JsonResponse<T> Invoke<T>(JsonRequest jsonRpc)
        {
            int myId;

            if (jsonRpc.Id == null)
            {
                lock (idLock)
                {
                    myId = ++id;
                }

                jsonRpc.Id = myId.ToString();
            }

            var jsonReqStr = JsonConvert.SerializeObject(jsonRpc);
            var contextNameValueCollection = new NameValueCollection();
            Rpc.GlobalContextSet?.Invoke(contextNameValueCollection);
            MergeContextValues(contextNameValueCollection, this.Option);

            var abpSession = Dependency.IocManager.Instance.Resolve<IAbpSession>();
            if (!string.IsNullOrEmpty(abpSession.AccessToken)) // 将用户Token转递到其他微服务
            {
                contextNameValueCollection.Add("AccessToken", "1");
                contextNameValueCollection.Add("UserId", abpSession.UserId?.ToString());
                contextNameValueCollection.Add("UserName", abpSession.UserName);
                contextNameValueCollection.Add("TenantId", abpSession.TenantId?.ToString());
                contextNameValueCollection.Add("RoleIds", abpSession.RoleIds);

            }
            var jsonRespStr = LocalRpcRun(jsonReqStr, contextNameValueCollection);
            JsonResponse<T> rjson = JsonConvert.DeserializeObject<JsonResponse<T>>(jsonRespStr);

            if (rjson == null)
            {
                if (!string.IsNullOrEmpty(jsonRespStr))
                {
                    JObject jo = JsonConvert.DeserializeObject(jsonRespStr) as JObject;
                    throw new Exception(jo["Error"].ToString());
                }
                else
                {
                    throw new Exception("Empty response");
                }
            }

            return rjson;
        }

        private static string LocalRpcRun(string jsonReqStr, NameValueCollection contextNameValueCollection)
        {
            return JsonRpcProcessor.Process(jsonReqStr, contextNameValueCollection).Result;
        }
    }

internal class HttpRpcInvoker : RpcInvoker
    {

        public static bool EnabledGzip = true;

        private static Stream CopyAndClose(Stream inputStream)
        {
            const int readSize = 4096;
            byte[] buffer = new byte[readSize];
            MemoryStream ms = new MemoryStream();

            int count = inputStream.Read(buffer, 0, readSize);
            while (count > 0)
            {
                ms.Write(buffer, 0, count);
                count = inputStream.Read(buffer, 0, readSize);
            }
            ms.Position = 0;
            inputStream.Close();
            return ms;
        }


        internal override JsonResponse<T> Invoke<T>(JsonRequest jsonRpc)
        {
            HttpWebRequest req = null;

            int myId;

            if (jsonRpc.Id == null)
            {
                lock (idLock)
                {
                    myId = ++id;
                }

                jsonRpc.Id = myId.ToString();
            }

            req = WebRequest.Create(new Uri(ServiceAddress + "?callid=" + jsonRpc.Id.ToString() + "&method=" + jsonRpc.Method)) as HttpWebRequest;
            req.KeepAlive = false;
            req.Proxy = null;
            req.Method = "Post";
            req.ContentType = "application/json-rpc";
            if (Rpc.GlobalContextSet != null)
                Rpc.GlobalContextSet(req.Headers);

            var accessToken = Dependency.IocManager.Instance.Resolve<IAbpSession>().AccessToken;
            if(!string.IsNullOrEmpty(accessToken)) // 将用户Token转递到其他微服务
            {
                req.Headers.Add("Authorization", accessToken);

            }
            if(!string.IsNullOrEmpty(Rpc.GlobalAccessToken))
            {
                req.Headers.Add("Authorization", "Bearer " + Rpc.GlobalAccessToken);
            }
            MergeContextValues(req.Headers, this.Option);
            if (this.Option != null && this.Option.Timeout > 0)
            {
                req.Timeout = this.Option.Timeout;
            }
            else
            {
                req.Timeout = 400000;
            }
            req.ReadWriteTimeout = req.Timeout;
            if (EnabledGzip)
            {
                req.Headers["Accept-Encoding"] = "gzip";
            }

            var stream = new StreamWriter(req.GetRequestStream());
            var json = Newtonsoft.Json.JsonConvert.SerializeObject(jsonRpc);
            stream.Write(json);
            stream.Close();

            var resp = req.GetResponse();
            string sstream;

            string contentEncoding = resp.Headers["Content-Encoding"];

            if (contentEncoding != null && contentEncoding.Contains("gzip"))
            {
                var mstream = CopyAndClose(resp.GetResponseStream());

                using (var gstream = new GZipStream(mstream, CompressionMode.Decompress))
                {
                    using (var reader = new StreamReader(gstream, UTF8Encoding))
                    {
                        sstream = reader.ReadToEnd();
                    }
                }
            }
            else
            {
                using (var rstream = new StreamReader(CopyAndClose(resp.GetResponseStream())))
                {
                    sstream = rstream.ReadToEnd();
                }
            }
            resp.Close();
            JsonResponse<T> rjson = Newtonsoft.Json.JsonConvert.DeserializeObject<JsonResponse<T>>(sstream);

            if (rjson == null)
            {
                if (!string.IsNullOrEmpty(sstream))
                {
                    JObject jo = Newtonsoft.Json.JsonConvert.DeserializeObject(sstream) as JObject;
                    throw new Exception(jo["Error"].ToString());
                }
                else
                {
                    throw new Exception("Empty response");
                }
            }

            return rjson;
        }

    }

internal class TcpRpcInvoker : RpcInvoker
    {
        internal override JsonResponse<T> Invoke<T>(JsonRequest jsonRpc)
        {
            int myId;

            if (jsonRpc.Id == null)
            {
                lock (idLock)
                {
                    myId = ++id;
                }

                jsonRpc.Id = myId.ToString();
            }

            var jsonReqStr = JsonConvert.SerializeObject(jsonRpc);
            var contextNameValueCollection = new NameValueCollection();
            Rpc.GlobalContextSet?.Invoke(contextNameValueCollection);
            MergeContextValues(contextNameValueCollection, this.Option);

            var abpSession = Dependency.IocManager.Instance.Resolve<IAbpSession>();
            if (!string.IsNullOrEmpty(abpSession.AccessToken)) // 将用户Token转递到其他微服务
            {
                contextNameValueCollection.Add("AccessToken", "1");
                contextNameValueCollection.Add("UserId", abpSession.UserId?.ToString());
                contextNameValueCollection.Add("UserName", abpSession.UserName);
                contextNameValueCollection.Add("TenantId", abpSession.TenantId?.ToString());
                contextNameValueCollection.Add("RoleIds", abpSession.RoleIds);

            }

            var jsonRespStr = LocalRpcRun(jsonReqStr, contextNameValueCollection,this.ServiceAddress);
            JsonResponse<T> rjson = JsonConvert.DeserializeObject<JsonResponse<T>>(jsonRespStr);

            if (rjson == null)
            {
                if (!string.IsNullOrEmpty(jsonRespStr))
                {
                    JObject jo = JsonConvert.DeserializeObject(jsonRespStr) as JObject;
                    throw new Exception(jo["Error"].ToString());
                }
                else
                {
                    throw new Exception("Empty response");
                }
            }

            return rjson;
        }

        private static string LocalRpcRun(string jsonReqStr, NameValueCollection contextNameValueCollection, string serverAddress)
        {
            var transportClientFactory = Dependency.IocManager.Instance.Resolve<ITransportClientFactory>();
            var req = JsonConvert.DeserializeObject<JsonRequest>(jsonReqStr);
            var tcpAddress = serverAddress.Split(new char[] { : }, StringSplitOptions.RemoveEmptyEntries);
            var result = transportClientFactory.CreateClient(new IpAddressModel(tcpAddress[0], int.Parse(tcpAddress[1])).CreateEndPoint())
          .SendAsync(req, contextNameValueCollection, Task.Factory.CancellationToken).Result;
return JsonConvert.SerializeObject(result); } }

 

以上是关于企业级工作流解决方案--微服务消息处理模型之客户端端处理的主要内容,如果未能解决你的问题,请参考以下文章

企业级工作流解决方案--微服务消息处理模型之消息传输通道

企业级工作流解决方案--微服务Tcp消息传输模型之服务端处理

企业级工作流解决方案--微服务消息处理模型之服务端处理

企业级工作流解决方案--微服务消息处理模型之与Abp集成

企业级工作流解决方案--微服务Tcp消息传输模型之消息编解码

微服务架构模式系列文章之二:微服务架构