EventBus
Posted cloudsu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了EventBus相关的知识,希望对你有一定的参考价值。
EventBus 是一种事件发布订阅模式,借助 EventBus 我们可以很好的实现组件之间,服务之间,系统之间的解耦以及相互通信的问题。
EventBus 相当于是定义一些抽象接口,可以用 MQ 来实现EventBus
1、模块的预处理模块,定义预处理方法,增加实现ILocalEventHandler,IDistributedEventHandler的服务增加到配置
LocalEventBusOptions、DistributedEventBusOptions,分别存储ITypeList<IEventHandler> Handlers get;
2、IEventHandle接口,分ILocalEventHandler<in TEvent>,IDistributedEventHandler
/// <summary> /// Handler handles the event by implementing this method. /// </summary> /// <param name="eventData">Event data</param> Task HandleEventAsync(TEvent eventData);
IEventHandlerFactory,负责得到或创建事件处理器,三个实现IocEventHandlerFactory,SingleInstanceHandlerFactory,TransientEventHandlerFactory
IEventHandlerDisposeWrapper GetHandler(); public interface IEventHandlerDisposeWrapper : IDisposable IEventHandler EventHandler get; public class EventHandlerDisposeWrapper : IEventHandlerDisposeWrapper public IEventHandler EventHandler get; private readonly Action _disposeAction; public EventHandlerDisposeWrapper(IEventHandler eventHandler, Action disposeAction = null) _disposeAction = disposeAction; EventHandler = eventHandler; public void Dispose() _disposeAction?.Invoke();
3、EventBus
:用来发布/订阅/取消订阅事件
public abstract Task PublishAsync(Type eventType, object eventData); public abstract IDisposable Subscribe(Type eventType, IEventHandlerFactory factory); public abstract void Unsubscribe<TEvent>(Func<TEvent, Task> action) where TEvent : class; public abstract void Unsubscribe(Type eventType, IEventHandler handler); public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory); public abstract void Unsubscribe(Type eventType, IEventHandlerFactory factory); public abstract void UnsubscribeAll(Type eventType);
二、AbpRabbitMqModule
[DependsOn( typeof(AbpJsonModule), typeof(AbpThreadingModule) )] public class AbpRabbitMqModule : AbpModule public override void ConfigureServices(ServiceConfigurationContext context) var configuration = context.Services.GetConfiguration(); Configure<AbpRabbitMqOptions>(configuration.GetSection("RabbitMQ")); public override void OnApplicationShutdown(ApplicationShutdownContext context) context.ServiceProvider .GetRequiredService<IChannelPool>() .Dispose(); context.ServiceProvider .GetRequiredService<IConnectionPool>() .Dispose();
三、AbpEventBusRabbitMqModule
public override Task PublishAsync(Type eventType, object eventData) var eventName = EventNameAttribute.GetNameOrDefault(eventType); var body = Serializer.Serialize(eventData); using (var channel = ConnectionPool.Get(RabbitMqEventBusOptions.ConnectionName).CreateModel()) channel.ExchangeDeclare( RabbitMqEventBusOptions.ExchangeName, "direct", durable: true ); var properties = channel.CreateBasicProperties(); properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent; channel.BasicPublish( exchange: RabbitMqEventBusOptions.ExchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body ); return Task.CompletedTask;
public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory) var handlerFactories = GetOrCreateHandlerFactories(eventType); handlerFactories.Add(factory); if (handlerFactories.Count == 1) //TODO: Multi-threading! Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType)); return new EventHandlerFactoryUnregistrar(this, eventType, factory);
以上是关于EventBus的主要内容,如果未能解决你的问题,请参考以下文章
EventBusEventBus 事件总线框架简介 ( EventBus 使用流程 )
EventBusEventBus 源码解析 ( 注册订阅者 | 订阅方法 | 查找订阅方法 )
EventBusEventBus 源码解析 ( 事件发送 | 线程池中执行订阅方法 )
EventBusEventBus 源码解析 ( 事件发送 | postToSubscription 方法 | EventBus 线程模式处理细节 )
EventBusEventBus 源码解析 ( 注册订阅者 | 注册订阅方法详细过程 )
EventBusEventBus 源码解析 ( 事件发送 | EventBus.post 方法 | EventBus.postSingleEvent 方法 )