企业级工作流解决方案--微服务Tcp消息传输模型之客户端处理
Posted spritekuang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了企业级工作流解决方案--微服务Tcp消息传输模型之客户端处理相关的知识,希望对你有一定的参考价值。
客户端启动
客户端启动主要做三件事情,1. 从配置文件读取服务调用配置,存储到全局对象中。2. 指定客户端编解码器工厂。3. 预连接,即预先建立与服务端的通信Chanel。
[DependsOn(typeof(AbpKernelModule))] public class JsonRpcClientModule : AbpModule { public override void PreInitialize() { // 注册客户端配置,固定从Xml文件读取 SocketClientConfiguration socketClientConfiguration = XmlConfigProvider.GetConfig<SocketClientConfiguration>("SocketClientConfiguration.xml"); IocManager.IocContainer.Register( Component .For<ISocketClientConfiguration>() .Instance(socketClientConfiguration) ); switch (socketClientConfiguration.MessageCode) { case EMessageCode.Json: IocManager.RegisterIfNot<ITransportMessageCodecFactory, JsonTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton); break; case EMessageCode.MessagePack: IocManager.RegisterIfNot<ITransportMessageCodecFactory, MessagePackTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton); break; case EMessageCode.ProtoBuffer: IocManager.RegisterIfNot<ITransportMessageCodecFactory, ProtoBufferTransportMessageCodecFactory>(Dependency.DependencyLifeStyle.Singleton); break; } } public override void Initialize() { IocManager.RegisterAssemblyByConvention(typeof(JsonRpcClientModule).GetAssembly()); var dotNettyTransportClientFactory = new DotNettyTransportClientFactory(IocManager.Resolve<ITransportMessageCodecFactory>(), Logger); IocManager.IocContainer.Register( Component .For<ITransportClientFactory>() .Instance(dotNettyTransportClientFactory) ); } public override void PostInitialize() { var socketClientConfiguration = Configuration.Modules.RpcClientConfig(); var transportClientFactory = IocManager.Resolve<ITransportClientFactory>(); try { foreach (var clientConnectServerInfo in socketClientConfiguration.ClientConnectServerInfos) // 预连接 { if (clientConnectServerInfo.ConnectServerType == EConnectServerType.Tcp) { var tcpAddress = clientConnectServerInfo.Url.Split(new char[] { ‘:‘ }, StringSplitOptions.RemoveEmptyEntries); transportClientFactory.CreateClient(new IpAddressModel(tcpAddress[0], int.Parse(tcpAddress[1])).CreateEndPoint()); } } } catch(Exception ex) // 预连,出错不处理 { } } }
客户端全局Chanel设计
每一个服务连接创建一个TransportClient与之对应,存储在全局变量中private readonly ConcurrentDictionary<EndPoint, Lazy<ITransportClient>> _clients = new ConcurrentDictionary<EndPoint, Lazy<ITransportClient>>();
TransportClient即处理客户端传输消息对象,每当发起客户端调用时,创建transportClient对象,并存储到_clients集合中,下次对同一个服务端调用时,直接复用此对象,如果与服务器的通信Chanel断开,则从_clients对象中移除,达到了复用Chanel的作用。
/// <summary> /// 创建客户端。 /// </summary> /// <param name="endPoint">终结点。</param> /// <returns>传输客户端实例。</returns> public ITransportClient CreateClient(EndPoint endPoint) { var key = endPoint; _logger.Debug($"准备为服务端地址:{key}创建客户端。"); try { return _clients.GetOrAdd(key , k => new Lazy<ITransportClient>(() => { var bootstrap = _bootstrap; var channel = bootstrap.ConnectAsync(k).Result; var messageListener = new MessageListener(); channel.GetAttribute(messageListenerKey).Set(messageListener); var messageSender = new DotNettyMessageClientSender(_transportMessageEncoder, channel); channel.GetAttribute(messageSenderKey).Set(messageSender); channel.GetAttribute(origEndPointKey).Set(k); var client = new TransportClient(messageSender, messageListener, _logger); return client; } )).Value; } catch { _clients.TryRemove(key, out var value); var ipEndPoint = endPoint as IPEndPoint; throw; } } protected class DefaultChannelHandler : ChannelHandlerAdapter { public override void ChannelInactive(IChannelHandlerContext context) { _factory._clients.TryRemove(context.Channel.GetAttribute(origEndPointKey).Get(), out var value); } }
TransportClient
默认的客户端传输实现,Rpc调用时,直接组装请求参数,调用SendAsync方法。注意里面的ManualResetValueTaskSource的设计。
/// <summary> /// 一个默认的传输客户端实现。 /// </summary> public class TransportClient : ITransportClient, IDisposable { #region Field private readonly IMessageSender _messageSender; private readonly IMessageListener _messageListener; private readonly ILogger _logger; private readonly ConcurrentDictionary<string, ManualResetValueTaskSource<TransportMessage>> _resultDictionary = new ConcurrentDictionary<string, ManualResetValueTaskSource<TransportMessage>>(); #endregion Field #region Constructor public TransportClient(IMessageSender messageSender, IMessageListener messageListener, ILogger logger) { _messageSender = messageSender; _messageListener = messageListener; _logger = logger; messageListener.Received += MessageListener_Received; } #endregion Constructor #region Implementation of ITransportClient /// <summary> /// 发送消息。 /// </summary> /// <param name="message">远程调用消息模型。</param> /// <returns>远程调用消息的传输消息。</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)] public async Task<JsonResponse> SendAsync(JsonRequest message, NameValueCollection contextNameValueCollection, CancellationToken cancellationToken) { try { _logger.Debug("准备发送消息。"); var transportMessage = TransportMessage.CreateInvokeMessage(message, contextNameValueCollection); //注册结果回调 var callbackTask = RegisterResultCallbackAsync(transportMessage.Id, cancellationToken); try { //发送 await _messageSender.SendAndFlushAsync(transportMessage); } catch (Exception exception) { throw new CommunicationException("与服务端通讯时发生了异常。", exception); } _logger.Debug("消息发送成功。"); return await callbackTask; } catch (Exception exception) { _logger.Error("消息发送失败。"); throw; } } #endregion Implementation of ITransportClient #region Implementation of IDisposable /// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary> public void Dispose() { (_messageSender as IDisposable)?.Dispose(); (_messageListener as IDisposable)?.Dispose(); foreach (var taskCompletionSource in _resultDictionary.Values) { taskCompletionSource.SetCanceled(); } } #endregion Implementation of IDisposable #region Private Method /// <summary> /// 注册指定消息的回调任务。 /// </summary> /// <param name="id">消息Id。</param> /// <returns>远程调用结果消息模型。</returns> [MethodImpl(MethodImplOptions.AggressiveInlining)] private async Task<JsonResponse> RegisterResultCallbackAsync(string id, CancellationToken cancellationToken) { _logger.Debug($"准备获取Id为:{id}的响应内容。"); var task = new ManualResetValueTaskSource<TransportMessage>(); _resultDictionary.TryAdd(id, task); try { var result = await task.AwaitValue(cancellationToken); return result.GetContent<JsonResponse>(); } finally { //删除回调任务 ManualResetValueTaskSource<TransportMessage> value; _resultDictionary.TryRemove(id, out value); value.SetCanceled(); } } private async Task MessageListener_Received(IMessageSender sender, TransportMessage message) { _logger.Debug("服务消费者接收到消息。"); ManualResetValueTaskSource<TransportMessage> task; if (!_resultDictionary.TryGetValue(message.Id, out task)) return; if (message.IsInvokeResultMessage()) { var content = message.GetContent<JsonResponse>(); if (content.Error != null) { task.SetException(content.Error); } else { task.SetResult(message); } } } #endregion Private Method }
以上是关于企业级工作流解决方案--微服务Tcp消息传输模型之客户端处理的主要内容,如果未能解决你的问题,请参考以下文章