Java Review - Java进程内部的消息中间件_Event Bus设计模式
Posted 小小工匠
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Review - Java进程内部的消息中间件_Event Bus设计模式相关的知识,希望对你有一定的参考价值。
文章目录
概述
在工作中,我们都会使用到MQ 比如 Apache Kafka等,某subscriber在消息中间件上注册了某个topic(主题),当有消息发送到了该topic上之后,注册在该topic上的所有subscriber都将会收到消息 。
消息中间件提供了系统之间的异步处理机制。 主业务完成后即可向用户返回成功的通知,然后提交各种消息至消息中间件,这样注册在消息中间件的其他系统就可以顺利地接收通知了,然后执行各自的业务逻辑。消息中间件主要用于解决进程之间消息异步处理的解决方案,这里,我们使用消息中间件的思想设计一个Java进程内部的消息中间件——Event Bus。
EventBus架构类图
-
Bus接口对外提供了几种主要的使用方式,比如post方法用来发送Event,register方法用来注册Event接收者(Subscriber)接受响应事件
-
EventBus采用同步的方式推送Event,AsyncEventBus采用异步的方式(Thread-Per-Message)推送Event。
-
Registry注册表,主要用来记录对应的Subscriber以及受理消息的回调方法,回调方法我们用注解@Subscribe来标识。
-
Dispatcher主要用来将event广播给注册表中监听了topic的Subscriber
Code
Bus接口 (定义注册topic以及发送event接口)
/**
* Bus接口定义了EventBus的所有使用方法
*
* @author artisan
*/
public interface Bus
/**
* 将某个对象注册到Bus上,从此之后该类就成为Subscriber了
*/
void register(Object subscriber);
/**
* 将某个对象从Bus上取消注册,取消注册之后就不会再接收到来自Bus的任何消息
*/
void unregister(Object subscriber);
/**
* 提交Event到默认的topic
*/
void post(Object event);
/**
* 提交Event到指定的topic
*/
void post(Object event, String topic);
/**
* 关闭该bus
*/
void close();
/**
* 返回Bus的名称标识
*/
String getBusName();
Bus接口中定义了注册topic的方法和Event发送的方法
-
register(Object subscriber)
:将某个对象实例注册给Event Bus。 -
unregister(Object subscriber)
:取消对该对象实例的注册,会在Event Bus的注册表(Registry)中将其移除。 -
post(Object event)
:提交Event到Event Bus中,如果未指定topic则会将event广播给Event Bus默认的topic。 -
post(Object event, String topic)
:提交Event的同时指定了topic。 -
close()
:销毁该Event Bus。 -
getBusName()
:返回该Event Bus的名称
自定义注解-回调方法及topic
注册对象给Event Bus的时候需要指定接收消息时的回调方法,我们采用注解的方式进行Event回调
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author artisan
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe
String topic() default "default-topic";
@Subscribe要求注解在类中的方法,注解时可指定topic,不指定的情况下为默认的topic 【default-topic】
同步EventBus
同步EventBus是最核心的一个类,它实现了Bus的所有功能,但是该类对Event的广播推送采用的是同步的方式
/**
* @author 小工匠
* @version 1.0
* @description: Bus实现类
* @date 2021/12/1 23:00
* @mark: show me the code , change the world
*/
public class EventBus implements Bus
/**
* 用于维护Subscriber的注册表
*/
private final Registry registry = new Registry();
/**
* Event Bus的名字
*/
private String busName;
/**
* 默认的Event Bus的名字
*/
private final static String DEFAULT_BUS_NAME = "default";
/**
* 默认的topic的名字
*/
private final static String DEFAULT_TOPIC = "default-topic";
/**
* 用于分发广播消息到各个Subscriber的类
*/
private final Dispatcher dispatcher;
/**
* 构造函数
*/
public EventBus()
this(DEFAULT_BUS_NAME, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
public EventBus(String busName)
this(busName, null, Dispatcher.SEQ_EXECUTOR_SERVICE);
EventBus(String busName, EventExceptionHandler exceptionHandler, Executor executor)
this.busName = busName;
this.dispatcher = Dispatcher.newDispatcher(exceptionHandler, executor);
public EventBus(EventExceptionHandler exceptionHandler)
this(DEFAULT_BUS_NAME, exceptionHandler, Dispatcher.SEQ_EXECUTOR_SERVICE);
/**
* 将注册Subscriber的动作直接委托给Registry
*
* @param subscriber
*/
@Override
public void register(Object subscriber)
this.registry.bind(subscriber);
/**
* 接触注册同样委托给Registry
*
* @param subscriber
*/
@Override
public void unregister(Object subscriber)
this.registry.unbind(subscriber);
/**
* 提交Event到默认的topic
*
* @param event
*/
@Override
public void post(Object event)
this.post(event, DEFAULT_TOPIC);
/**
* 提交Event到指定的topic,具体的动作是由Dispatcher来完成的
*
* @param event
* @param topic
*/
@Override
public void post(Object event, String topic)
this.dispatcher.dispatch(this, registry, event, topic);
/**
* 关闭销毁Bus
*/
@Override
public void close()
this.dispatcher.close();
/**
* 返回Bus的名称
*
* @return
*/
@Override
public String getBusName()
return this.busName;
有几个点需要注意一下
-
EventBus的构造除了名称之外,还需要有ExceptionHandler和Executor,后两个主要是给Dispatcher使用的。
-
registry和unregister都是通过Subscriber注册表来完成的。
-
Event的提交则是由Dispatcher来完成的
-
Executor使用JDK中的Executor接口,如果我们自己开发的ThreadPool天生就是多线程并发执行任务的线程池,自带异步处理能力,但是无法做到同步任务处理,因此我们使用Executor可以任意扩展同步、异步的任务处理方式。
异步EventBus
异步的EventBus比较简单,继承自同步Bus,然后将Thread-Per-Message用异步处理任务的Executor替换EventBus中的同步Executor即可
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/12/2 10:59
* @mark: show me the code , change the world
*/
public class AsyncEventBus extends EventBus
AsyncEventBus(String busName, EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor)
super(busName, exceptionHandler, executor);
public AsyncEventBus(String busName, ThreadPoolExecutor executor)
this(busName, null, executor);
public AsyncEventBus(ThreadPoolExecutor executor)
this("default-async", null, executor);
public AsyncEventBus(EventExceptionHandler exceptionHandler, ThreadPoolExecutor executor)
this("default-async", exceptionHandler, executor);
可以看到AsyncEventBus 重写了父类EventBus的构造函数,使用ThreadPoolExecutor替代Executor。
Subscriber注册表Registry (维护topic和subscriber之间的关系)
注册表Registry维护了topic和subscriber之间的关系。
当有Event被post之后,Dispatcher需要知道该消息应该发送给哪个Subscriber的实例和对应的方法,Subscriber对象没有任何特殊要求,就是普通的类不需要继承任何父类或者实现任何接口
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/12/1 23:42
* @mark: show me the code , change the world
*/
class Registry
/**
* 存储Subscriber集合和topic之间关系的map
*/
private final ConcurrentHashMap<String, ConcurrentLinkedQueue<Subscriber>> subscriberContainer = new ConcurrentHashMap<>();
/**
* 获取Subscriber Object的方法集合然后进行绑定
*
* @param subscriber
*/
public void bind(Object subscriber)
List<Method> subscribeMethods = getSubscribeMethods(subscriber);
subscribeMethods.forEach(m -> tierSubscriber(subscriber, m));
public void unbind(Object subscriber)
//unbind为了提高速度,只对Subscriber进行失效操作
subscriberContainer.forEach((key, queue) ->
queue.forEach(s ->
if (s.getSubscribeObject() == subscriber)
s.setDisable(true);
));
public ConcurrentLinkedQueue<Subscriber> scanSubscriber(final String topic)
return subscriberContainer.get(topic);
private void tierSubscriber(Object subscriber, Method method)
final Subscribe subscribe = method.getDeclaredAnnotation(Subscribe.class);
String topic = subscribe.topic();
//当某topic没有Subscriber Queue的时候创建一个
subscriberContainer.computeIfAbsent(topic, key -> new ConcurrentLinkedQueue<>());
//创建一个Subscriber并且加入Subscriber列表中
subscriberContainer.get(topic).add(new Subscriber(subscriber, method));
private List<Method> getSubscribeMethods(Object subscriber)
final List<Method> methods = new ArrayList<>();
Class<?> temp = subscriber.getClass();
//不断获取当前类和父类的所有@Subscribe方法
while (temp != null)
//获取所有的方法
Method[] declaredMethods = temp.getDeclaredMethods();
//只有public方法 &&有一个入参 &&最重要的是被@Subscribe标记的方法才符合回调方法
Arrays.stream(declaredMethods)
.filter(m -> m.isAnnotationPresent(Subscribe.class)
&& m.getParameterCount() == 1
&& m.getModifiers() == Modifier.PUBLIC)
.forEach(methods::add);
temp = temp.getSuperclass();
return methods;
由于Registry是在Bus中使用的,不能暴露给外部,因此Registry被设计成了包可见的类。
我们所设计的EventBus对Subscriber没有做任何限制,但是要接受event的回调则需要将方法使用注解@Subscribe进行标记(可指定topic)
同一个Subscriber的不同方法通过@Subscribe注解之后可接受来自两个不同的topic消息
public class SimpleObject
/**
* subscribe方法,比如使用@Subscribe标记,并且是void类型且有一个参数
*/
@Subscribe(topic = "artisan-topic")
public void test2(Integer x)
@Subscribe(topic = "test-topic")
public void test3(Integer x)
SimpleObject的实例被注册到了Event Bus之后,test2和test3这两个方法将会被加入到注册表中,分别用来接受来自artisan-topic和test-topic的event 。
Event广播Dispatcher
Dispatcher的主要作用是将EventBus post的event推送给每一个注册到topic上的subscriber上,具体的推送其实就是执行被@Subscribe注解的方法。
import java.lang.reflect.Method;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/12/2 00:36
* @mark: show me the code , change the world
*/
public class Dispatcher
private final Executor executorService;
private final EventExceptionHandler exceptionHandler;
public static final Executor SEQ_EXECUTOR_SERVICE = SeqExecutorService.INSTANCE;
public static final Executor PRE_THREAD_EXECUTOR_SERVICE = PreThreadExecutorService.INSTANCE;
private Dispatcher(Executor executorService, EventExceptionHandler exceptionHandler)
this.executorService = executorService;
this.exceptionHandler = exceptionHandler;
public void dispatch(Bus bus, Registry registry, Object event, String topic)
//根据topic获取所有的Subscriber列表
ConcurrentLinkedQueue<Subscriber> subscribers = registry.scanSubscriber(topic);
if (null == subscribers)
if (exceptionHandler != null)
exceptionHandler.handle(new IllegalArgumentException("The topic " + topic + " not bind yet"),
new BaseEventContext(bus.getBusName(), null, event));
return;
//遍历所有的方法,并且通过反射的方式进行方法调用
subscribers.stream()
.filter(subscriber -> !subscriber.isDisable())
.filter(subscriber ->
Method subscribeMethod = subscriber.getSubscribeMethod();
Class<?> aClass = subscribeMethod.getParameterTypes()[0];
return (aClass.isAssignableFrom(event.getClass()));
).forEach(subscriber -> realInvokeSubscribe(subscriber, event, bus));
private void realInvokeSubscribe(Subscriber subscriber, Object event, Bus bus)
Method subscribeMethod = subscriber.getSubscribeMethod();
Object subscribeObject = subscriber.getSubscribeObject();
executorService.execute(() ->
try
subscribeMethod.invoke(subscribeObject, event);
catch (Exception e)
if (null != exceptionHandler)
exceptionHandler.handle(e, new BaseEventContext(bus.getBusName(), subscriber, event));
);
public void close()
if (executorService instanceof ExecutorService)
((ExecutorService) executorService).shutdown();
static Dispatcher newDispatcher(EventExceptionHandler exceptionHandler, Executor executor)
return new Dispatcher(executor, exceptionHandler);
static Dispatcher seqDispatcher(EventExceptionHandler exceptionHandler)
return new Dispatcher(SEQ_EXECUTOR_SERVICE, exceptionHandler);
static Dispatcher perThreadDispatcher(EventExceptionHandler exceptionHandler)
return new Dispatcher(PRE_THREAD_EXECUTOR_SERVICE, exceptionHandler);
/**
* 顺序执行的ExecutorService
*/
private static class SeqExecutorService implements Executor
private final static SeqExecutorService INSTANCE = new SeqExecutorService();
@Override
public void execute(Runnable command)
command.run();
/**
* 每个线程负责一次消息推送
*/
private static class PreThreadExecutorService implements Executor
private final static PreThreadExecutorService INSTANCE = new PreThreadExecutorService();
@Override
public void execute(Runnable command)
new Thread(command).start();
/**
* 默认的EventContext实现
*/
private static class BaseEventContext implements EventContext
Java Review (九面向对象----封装)