EventBus

Posted cloudsu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了EventBus相关的知识,希望对你有一定的参考价值。

EventBus 是一种事件发布订阅模式,借助 EventBus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。

 技术图片

EventBus 相当于是定义一些抽象接口,可以用 MQ 来实现EventBus

1、模块的预处理模块,定义预处理方法,增加实现ILocalEventHandler,IDistributedEventHandler的服务增加到配置

LocalEventBusOptions、DistributedEventBusOptions,分别存储ITypeList<IEventHandler> Handlers get;

2、IEventHandle接口,分ILocalEventHandler<in TEvent>,IDistributedEventHandler

        /// <summary>
        /// Handler handles the event by implementing this method.
        /// </summary>
        /// <param name="eventData">Event data</param>
        Task HandleEventAsync(TEvent eventData);

IEventHandlerFactory,负责得到或创建事件处理器,三个实现IocEventHandlerFactory,SingleInstanceHandlerFactory,TransientEventHandlerFactory

 IEventHandlerDisposeWrapper GetHandler();


  public interface IEventHandlerDisposeWrapper : IDisposable
    
        IEventHandler EventHandler  get; 
    

  public class EventHandlerDisposeWrapper : IEventHandlerDisposeWrapper
    
        public IEventHandler EventHandler  get; 

        private readonly Action _disposeAction;

        public EventHandlerDisposeWrapper(IEventHandler eventHandler, Action disposeAction = null)
        
            _disposeAction = disposeAction;
            EventHandler = eventHandler;
        

        public void Dispose()
        
            _disposeAction?.Invoke();
        
    

 

3、EventBus:用来发布/订阅/取消订阅事件

技术图片

public abstract Task PublishAsync(Type eventType, object eventData);

public abstract IDisposable Subscribe(Type eventType, IEventHandlerFactory factory);


 public abstract void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class;

 public abstract void Unsubscribe(Type eventType, IEventHandler handler);
 public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory);
public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory);
public abstract void UnsubscribeAll(Type eventType);

二、AbpRabbitMqModule

 [DependsOn(
        typeof(AbpJsonModule),
        typeof(AbpThreadingModule)
        )]
    public class AbpRabbitMqModule : AbpModule
    
        public override void ConfigureServices(ServiceConfigurationContext context)
        
            var configuration = context.Services.GetConfiguration();
            Configure<AbpRabbitMqOptions>(configuration.GetSection("RabbitMQ"));
        

        public override void OnApplicationShutdown(ApplicationShutdownContext context)
        
            context.ServiceProvider
                .GetRequiredService<IChannelPool>()
                .Dispose();

            context.ServiceProvider
                .GetRequiredService<IConnectionPool>()
                .Dispose();
        

三、AbpEventBusRabbitMqModule

public override Task PublishAsync(Type eventType, object eventData)
        
            var eventName = EventNameAttribute.GetNameOrDefault(eventType);
            var body = Serializer.Serialize(eventData);

            using (var channel = ConnectionPool.Get(RabbitMqEventBusOptions.ConnectionName).CreateModel())
            
                channel.ExchangeDeclare(
                    RabbitMqEventBusOptions.ExchangeName,
                    "direct",
                    durable: true
                );
                
                var properties = channel.CreateBasicProperties();
                properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;

                channel.BasicPublish(
                   exchange: RabbitMqEventBusOptions.ExchangeName,
                    routingKey: eventName,
                    mandatory: true,
                    basicProperties: properties,
                    body: body
                );
            

            return Task.CompletedTask;
        
        public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
        
            var handlerFactories = GetOrCreateHandlerFactories(eventType);
            
            handlerFactories.Add(factory);

            if (handlerFactories.Count == 1) //TODO: Multi-threading!
            
                Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType));
            

            return new EventHandlerFactoryUnregistrar(this, eventType, factory);
        

 

以上是关于EventBus的主要内容,如果未能解决你的问题,请参考以下文章

EventBusEventBus 事件总线框架简介 ( EventBus 使用流程 )

EventBusEventBus 源码解析 ( 注册订阅者 | 订阅方法 | 查找订阅方法 )

EventBusEventBus 源码解析 ( 事件发送 | 线程池中执行订阅方法 )

EventBusEventBus 源码解析 ( 事件发送 | postToSubscription 方法 | EventBus 线程模式处理细节 )

EventBusEventBus 源码解析 ( 注册订阅者 | 注册订阅方法详细过程 )

EventBusEventBus 源码解析 ( 事件发送 | EventBus.post 方法 | EventBus.postSingleEvent 方法 )