eShopOnContainers学习系列:RabbitMQ消息总线实践
Posted weiblog
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了eShopOnContainers学习系列:RabbitMQ消息总线实践相关的知识,希望对你有一定的参考价值。
今天研究了下eShopOnContainers里的RabbitMQ的使用,在项目里是以封装成消息总线的方式使用的,但是仍然是以其发布、订阅两个方法作为基础封装的,我们今天就来实际使用一下。
为了简单起见,就在同一个API项目里实现发布订阅。
新建API项目 RabbitMQ_Bus_Test ,类库 EventBus、EventBusRabbitMQ,这两个类库中将会实现消息总线最主要的方法、发布订阅。
在EventBus中新增消息事件类:IntegrationEvent,这个类在事件里是作为一个消息父类,所有的消息类都需要继承这个类,在实际项目里按需修改。
public class IntegrationEvent { public IntegrationEvent() { Id = Guid.NewGuid(); CreationDate = DateTime.UtcNow; } [JsonConstructor] public IntegrationEvent(Guid id, DateTime createDate) { Id = id; CreationDate = createDate; } [JsonProperty] public Guid Id { get; private set; } [JsonProperty] public DateTime CreationDate { get; private set; } }
新增泛型接口类:IIntegrationEventHandler.cs,通过消息队列发送的消息需要有处理程序,而这个接口则是作为一个约束类存在,所有的处理程序都需要继承这个泛型接口类:
public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler where TIntegrationEvent : IntegrationEvent { Task Handle(TIntegrationEvent @event); } public interface IIntegrationEventHandler { }
新增事件订阅管理接口:IEventBusSubscriptionsManager,从类名就可以看出这个是专门用来管理消息处理方法的,我们这里主要看AddSubscription这个订阅方法,它的作用是绑定消息类和消息处理程序,即哪一个消息被哪一个方法消费,当然这里有两个约束,消息类需要继承IntegrationEvent,处理类需要继承IIntegrationEventHandler:
public interface IEventBusSubscriptionsManager { bool IsEmpty { get; } event EventHandler<string> OnEventRemoved; void AddDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; void AddSubscription<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>; void RemoveSubscription<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent; void RemoveDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent; bool HasSubscriptionsForEvent(string eventName); Type GetEventTypeByName(string eventName); void Clear(); IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent; IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName); string GetEventKey<T>(); }
接着新增它的实现类InMemoryEventBusSubscriptionsManager.cs,从类名可以看出订阅的管理都是在内存中处理的,在生产项目里我们可以使用Redis来进行存储:
public partial class InMemoryEventBusSubscriptionsManager: IEventBusSubscriptionsManager { private readonly Dictionary<string, List<SubscriptionInfo>> _handlers; private readonly List<Type> _eventTypes; public event EventHandler<string> OnEventRemoved; public InMemoryEventBusSubscriptionsManager() { _handlers = new Dictionary<string, List<SubscriptionInfo>>(); _eventTypes = new List<Type>(); } public bool IsEmpty => !_handlers.Keys.Any(); public void Clear() => _handlers.Clear(); public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName); public void AddDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { DoAddSubscription(typeof(TH), eventName, isDynamic: true); } public void AddSubscription<T,TH>() where T:IntegrationEvent where TH:IIntegrationEventHandler<T> { var eventName = GetEventKey<T>(); DoAddSubscription(typeof(TH), eventName, isDynamic: false); if(!_eventTypes.Contains(typeof(T))) { _eventTypes.Add(typeof(T)); } } public void DoAddSubscription(Type handlerType,string eventName,bool isDynamic) { if(!HasSubscriptionsForEvent(eventName)) { _handlers.Add(eventName, new List<SubscriptionInfo>()); } if(_handlers[eventName].Any(s=>s.HandlerType==handlerType)) { throw new ArgumentException($"Handler Type {handlerType.Name} already registered for ‘{eventName}‘", nameof(handlerType)); } if(isDynamic) { _handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType)); } else { _handlers[eventName].Add(SubscriptionInfo.Typed(handlerType)); } } public void RemoveDynamicSubscription<TH>(string eventName) where TH: IDynamicIntegrationEventHandler { var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName); } private SubscriptionInfo FindDynamicSubscriptionToRemove<TH>(string eventName) where TH:IDynamicIntegrationEventHandler { return DoFindSubscriptionToRemove(eventName, typeof(TH)); } private SubscriptionInfo DoFindSubscriptionToRemove(string eventName,Type handlerType) { if(!HasSubscriptionsForEvent(eventName)) { return null; } return _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType); } public void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove) { if(subsToRemove!=null) { _handlers[eventName].Remove(subsToRemove); if(!_handlers[eventName].Any()) { _handlers.Remove(eventName); var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName); if(eventType!=null) { _eventTypes.Remove(eventType); } RaiseOnEventRemoved(eventName); } } } public void RaiseOnEventRemoved(string eventName) { var handler = OnEventRemoved; if(handler!=null) { OnEventRemoved(this, eventName); } } public void RemoveSubscription<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var handlerToRemove = FindSubscriptionToRemove<T, TH>(); var eventName = GetEventKey<T>(); DoRemoveHandler(eventName, handlerToRemove); } private SubscriptionInfo FindSubscriptionToRemove<T,TH>() where T:IntegrationEvent where TH:IIntegrationEventHandler<T> { var eventName = GetEventKey<T>(); return DoFindSubscriptionToRemove(eventName, typeof(TH)); } public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName); public IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent { var key = GetEventKey<T>(); return GetHandlersForEvent(key); } public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName) => _handlers[eventName]; public string GetEventKey<T>() { return typeof(T).Name; } public bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent { var key = GetEventKey<T>(); return HasSubscriptionsForEvent(key); } }
这个实现类里有许多关于订阅事件的处理方法,我们本次需要关注的有两个地方,这里使用字典Dictionary<string, List<SubscriptionInfo>>存储消息类和消息处理类,这里有个事件资源类订阅信息类,存储两个属性,一个是否是动态的,另一个是处理程序类型:
public class SubscriptionInfo { public bool IsDynamic { get; } public Type HandlerType { get; } private SubscriptionInfo(bool isDynamic, Type handlerType) { IsDynamic = isDynamic; HandlerType = handlerType; } public static SubscriptionInfo Dynamic(Type handlerType) { return new SubscriptionInfo(true, handlerType); } public static SubscriptionInfo Typed(Type handlerType) { return new SubscriptionInfo(false, handlerType); } }
简单描述一下消息的处理流程,在程序初次加载时会将消息类和对应的处理类进行绑定,当消息发送的时候从根据发送的消息类从字典里查找对应的处理类,通过反射进行加载调用。
OK,关于消息资源方面的方法就这些,下面我们看下关于RabbitMQ的封装,新增事件总线接口,这里面有两个最重要的方法,发布和订阅:
public interface IEventBus { void Publish(IntegrationEvent @event); void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>; void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; void Unsubscribe<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent; }
看一下它的实现类EventBusRabbitMQ.cs,这里我们只看最重要的两个方法Publish和Subscribe:
public class EventBusRabbitMQ : IEventBus, IDisposable { const string BROKER_NAME = "eshop_event_bus"; private readonly IRabbitMQPersistentConnection _persistentConnection; private readonly ILogger<EventBusRabbitMQ> _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly ILifetimeScope _autofac; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private readonly int _retryCount; private IModel _consumerChannel; private string _queueName; public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _queueName = queueName; _consumerChannel = CreateConsumerChannel(); _autofac = autofac; _retryCount = retryCount; _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; } private void SubsManager_OnEventRemoved(object sender, string eventName) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); if (_subsManager.IsEmpty) { _queueName = string.Empty; _consumerChannel.Close(); } } } public void Publish(IntegrationEvent @event) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var policy = RetryPolicy.Handle<BrokerUnreachableException>() .Or<SocketException>() .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { _logger.LogWarning(ex.ToString()); }); using (var channel = _persistentConnection.CreateModel()) { var eventName = @event.GetType() .Name; channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); var message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); try { policy.Execute(() => { var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; // persistent channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, mandatory: false, basicProperties: properties, body: body); }); } catch(Exception ex) { Console.WriteLine(ex.Message); } } } /// <summary> /// 订阅动态 /// </summary> public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { DoInternalSubscription(eventName); _subsManager.AddDynamicSubscription<TH>(eventName); } /// <summary> /// 订阅 /// </summary> /// <typeparam name="T"></typeparam> /// <typeparam name="TH"></typeparam> public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var eventName = _subsManager.GetEventKey<T>(); DoInternalSubscription(eventName); _subsManager.AddSubscription<T, TH>(); } /// <summary> /// 进行内部订阅 /// </summary> /// <param name="eventName"></param> private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if(!containsKey) { if(!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } } /// <summary> /// 取消订阅 /// </summary> public void Unsubscribe<T,TH>() where TH:IIntegrationEventHandler<T> where T:IntegrationEvent { _subsManager.RemoveSubscription<T, TH>(); } /// <summary> /// 取消订阅动态 /// </summary> public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { _subsManager.RemoveDynamicSubscription<TH>(eventName); } /// <summary> /// 释放资源 /// </summary> public void Dispose() { if(_consumerChannel!=null) { _consumerChannel.Dispose(); } _subsManager.Clear(); } /// <summary> /// 创建消费者通道 /// </summary> private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var channel = _persistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); channel.QueueDeclare(queue: _queueName , durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { var eventName = ea.RoutingKey; var message = Encoding.UTF8.GetString(ea.Body); await ProcessEvent(eventName, message); channel.BasicAck(ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer); channel.CallbackException += (sender, ea) => { _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); }; return channel; } /// <summary> /// 流程事件 /// </summary> private async Task ProcessEvent(string eventName, string message) { if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; if (handler == null) continue; dynamic eventData = JObject.Parse(message); await handler.Handle(eventData); } else { var handler = scope.ResolveOptional(subscription.HandlerType); if (handler == null) continue; var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } } } }
首先是发布流程,当调用发布方法的时候首先会检查RabbitMQ是否连接,关于RabbitMQ连接的操作,项目里又封装了一个单独和接口和实现,下面贴一下代码:
public interface IRabbitMQPersistentConnection:IDisposable { bool IsConnected { get; } bool TryConnect(); IModel CreateModel(); }
public class DefaultRabbitMQPersistentConnection:IRabbitMQPersistentConnection { private readonly IConnectionFactory _connectionFactory; private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger; private readonly int _retryCount; IConnection _connection; bool _disposed; object sync_root = new object(); public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory,ILogger<DefaultRabbitMQPersistentConnection> logger,int retryCount=5) { _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _retryCount = retryCount; } public bool IsConnected { get { return _connection != null && _connection.IsOpen && !_disposed; } } public IModel CreateModel() { if(!IsConnected) { throw new InvalidOperationException("No RabbitMQ connections are available to perform this action"); } return _connection.CreateModel(); } public void Dispose() { if (_disposed) return; _disposed = true; try { _connection.Dispose(); } catch(IOException ex) { _logger.LogCritical(ex.ToString()); } } public bool TryConnect() { _logger.LogInformation("RabbitMQ Client is trying to connect"); lock(sync_root) { var policy = RetryPolicy.Handle<SocketException>() .Or<BrokerUnreachableException>() .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { _logger.LogWarning(ex.ToString()); }); policy.Execute(() => { _connection = _connectionFactory .CreateConnection(); }); if(IsConnected) { _connection.ConnectionShutdown += OnConnectionShutdown; _connection.CallbackException += OnCallbackException; _connection.ConnectionBlocked += OnConnectionBlocked; _logger.LogInformation($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events"); return true; } else { _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened"); return false; } } } private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) { if (_disposed) return; _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect..."); TryConnect(); } void OnCallbackException(object sender, CallbackExceptionEventArgs e) { if (_disposed) return; _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect..."); TryConnect(); } void OnConnectionShutdown(object sender,ShutdownEventArgs reason) { if (_disposed) return; _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect..."); TryConnect(); } }
首在程序加载的时候会创建一个消费者通道,同时给通道的Received委托注册了一个消费方法,这个消费方法的关键点是调用了ProcessEvent方法,完成后就发送一个ack确认:
private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var channel = _persistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); channel.QueueDeclare(queue: _queueName , durable: true, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { var eventName = ea.RoutingKey; var message = Encoding.UTF8.GetString(ea.Body); await ProcessEvent(eventName, message); channel.BasicAck(ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer); channel.CallbackException += (sender, ea) => { _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); }; return channel; }
我们看一下ProcessEvent方法,这个方法是专门用来消费MQ的,前面我们看过,通过字典记录了每个消息类对应的消费类Handler,这个方法的逻辑就是从字典里根据消息类查找对应的消费Handler并通过反射调用:
private async Task ProcessEvent(string eventName, string message) { if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; if (handler == null) continue; dynamic eventData = JObject.Parse(message); await handler.Handle(eventData); } else { var handler = scope.ResolveOptional(subscription.HandlerType); if (handler == null) continue; var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } } }
方法看起来一堆变量名有点乱,其实梳理一下就会发现很简单,方法需要传入一个消息类名eventName和消息内容message,首先会根据eventName判断字典里是注册了当前消息体,如果没有,则直接跳过,并返回ack确认,相当于丢弃该消息。如果字典里存在,则从字典中取出这些处理方法,通过反射加载调用。
OK,那问题来了,字典里的内容是什么时候存进去的呢,那就是再调用订阅方法的时候存进去的,这个订阅方法是在启动类里调用的,我们看一下:
public void Configure(IApplicationBuilder app, IHostingEnvironment env) { ConfigureEventBus(app); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseMvcWithDefaultRoute(); }
private void ConfigureEventBus(IApplicationBuilder app) { var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); eventBus.Subscribe<PublisherIntegrationEvent, OrderStatusChangedValidationIntegrationEventHandle>(); }
这个是在Startup.cs的Configure方法里调用的,这里就是通过调用订阅方法Subscribe绑定消息类和处理类:
public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var eventName = _subsManager.GetEventKey<T>(); DoInternalSubscription(eventName); _subsManager.AddSubscription<T, TH>(); }
可以看到这里调用了两个方法,DoInternalSubscription方法是绑定routingKey到指定的交换器和队列:
private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if(!containsKey) { if(!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } }
而AddSubscription方法则是将消息类和处理类添加到字典中:
public void AddSubscription<T,TH>() where T:IntegrationEvent where TH:IIntegrationEventHandler<T> { var eventName = GetEventKey<T>(); DoAddSubscription(typeof(TH), eventName, isDynamic: false); if(!_eventTypes.Contains(typeof(T))) { _eventTypes.Add(typeof(T)); } }
public void DoAddSubscription(Type handlerType,string eventName,bool isDynamic) { if(!HasSubscriptionsForEvent(eventName)) { _handlers.Add(eventName, new List<SubscriptionInfo>()); } if(_handlers[eventName].Any(s=>s.HandlerType==handlerType)) { throw new ArgumentException($"Handler Type {handlerType.Name} already registered for ‘{eventName}‘", nameof(handlerType)); } if(isDynamic) { _handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType)); } else { _handlers[eventName].Add(SubscriptionInfo.Typed(handlerType)); } }
这里将消息类名和处理类的类型Type加到了字典里,后面会通过反射来进行调用。
OK,到这里主要的发布订阅方法就梳理完成了,写的有点乱,不过不要紧!!!后面有时间我再重新排版一下!!!!我们现在在新建的项目的试一下,我再ValueController里调用了发布方法,通过MQ发送一个订单号OrderId:
[HttpGet] public IEnumerable<string> Get() { var @event = new PublisherIntegrationEvent(5); _eventBus.Publish(@event); _logger.LogError("发送MQ成功!"); return new string[] { "value1", "value2" }; }
消息体PublisherIntegrationEvent是这样的:
public class PublisherIntegrationEvent : IntegrationEvent { public int OrderId { get; } public PublisherIntegrationEvent(int orderId) => OrderId = orderId; }
我在启动类里注册了对当前消息类的处理类OrderStatusChangedValidationIntegrationEventHandle:
public class OrderStatusChangedValidationIntegrationEventHandle: IIntegrationEventHandler<PublisherIntegrationEvent> { private readonly ILogger<OrderStatusChangedValidationIntegrationEventHandle> _logger = null; public OrderStatusChangedValidationIntegrationEventHandle(ILogger<OrderStatusChangedValidationIntegrationEventHandle> logger) { _logger = logger; } public async Task Handle(PublisherIntegrationEvent @event) { _logger.LogError("收到MQ" + @event.OrderId); } }
这里发布和订阅都在一个API里,直接启动:
OK,eShopOnContainers的梳理就到这里,接下来会在.NET Core商城系列里加入消息总线,但是会改一下,使用数据库+Redis存储消息的信息。
在我公司项目里现在也是使用RabbitMQ来实现服务与服务之间的异步通信,但是方式肯定和这里是不一样的,我们是根据RoutingKey与服务地址来绑定,通过windows服务处理消息的发布,根据RoutingKey查找对应的服务地址并进行调用,没有使用RabbitMQ的订阅方法,我们提供了一个web界面专门用来注册,这种方式还是挺好用的。
OK,大功告成,下面我会在商城项目里加入RabbitMQ的使用。
以上是关于eShopOnContainers学习系列:RabbitMQ消息总线实践的主要内容,如果未能解决你的问题,请参考以下文章
[eShopOnContainers 学习系列] - 00 - 开发环境需求
eShopOnContainers学习系列:RabbitMQ消息总线实践
eShopOnContainers学习系列:数据库连接健康检查
[eShopOnContainers 学习系列] - 01 - Roadmap and Milestones for future releases