多线程实现简单的事件异步处理框架
Posted coder-lcp
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多线程实现简单的事件异步处理框架相关的知识,希望对你有一定的参考价值。
老实说,多线程在web开发里面非常常见,很多web容器本身就支持多线程,所以很多时候我们在进行web开发的时候并不需要考虑多线程相关的负责问题,而只需要实现相关的业务功能即可。所以,可以概括地讲,很多时候的web开发,并没有多线程方面的考虑,因为web应用本身就是在多线程基础上的了。
但是,有些时候为了提高程序性能,在用户的一个请求中中如果包含过多的业务操作或者包含耗时比较长的业务操作,我们就需要考虑使用异步的方式来提高程序响应的速度了。这篇博客简单介绍了在java中如何使用多线程实现一个简单的异步框架。
这个事件异步处理框架主要的工作过程是这样的:通过producer类对事件实体类序列化后,存储在redis的list队列中,而comsumer则负责读取事件队列中的事件模型对象并反序列化后调用相应的handler实现类对象进行事件处理,这些handler实现类的对象,通过spring完成handler具体类的注册操作,存在在一个map结构中,更具体的请读者往下看,欢迎指正不足的地方!
一、同步、异步的概念
在学习多线程的时候,我们接触最多的概念估计是同步的概念了,多线程中同步的意思大概是这样:线程访问资源时一直在等待,知道资源访问结束。所以,有同步的概念,我们可以大概理解与之相对的异步的概念:线程在访问资源(或者处理耗时较长的数据)时,不必一直等待资源访问完成或者数据处理完,在等待期间线程可以做其他事情,而当资源访问完成之后,会采取回调的方式执行相应的代码。
例如,在IO读写中,同步的方式就是在IO 操作的阻塞过程一直阻塞,直到IO操作完成;而异步的意思就是在io操作阻塞过程线程去做其他事情,当IO操作完成后,采取回调的方式执行相应的操作。
二、异步框架的模型原理
1、生产者--消费者模式
有了解过设计模式的读者应该听过这个大名鼎鼎的设计模式了,它大概的思路如下示例图:
大概意思就是:生产者负责数据的产生,它将数据放到内存中去(一般是一个队列),而消费者则负责处理内存中的数据,处理完成后,可以通过回调的方式进行响应。上面的图比较粗略,下面是具体的实现示意图:
上面示意图具体说明了生产者消费者的具体实现方式:
eventProducer(当然,也可以是dataProducer等)是生产者,它会将前端传输过来的数据或者说需要处理的事件封装好,然后将这些封装好的数据放进一个队列里面去;
而eventConsumer是消费者,它会读取队列里面的数据,然后进行处理。
在这个过程,程序是以异步的方式运行的:生产者无需等待消费者处理完成,它的职责只是将数据推到内存里面去,然后就可以进行响应;而消费者只需要处理数据即可,它不用管数据是哪来的。显然,这样的方式可以提高响应的速度,同时使得异步的实现方式变得简单起来。
2、web开发中的异步框架思路
上面的生产者--消费者为我们实现web的异步框架提供了一种很好的思路:在复杂的业务操作或者耗时比较长的业务中,我们可以采用异步的方式提高程序的响应速度,而生产者消费者的模式正是我们实现异步框架的参考模型--复杂业务的service层使对应的生产者,它只需要将要处理的数据放进一个队列里面,然后即可相应用户;而相应的handler类则负责具体的数据处理。
3、为什么用异步?
显然,在上面描述的思路中,我们大概可以知道什么时候应该使用异步框架:对相应速度要求比较高请求,但是该请求的相关业务操作允许一定的延迟。
举个具体的例子:在一个社交网站中,很多时候会有点赞的操作,A给B点赞,一般来说会包含两个操作,第一个操作是告诉A点赞成功了,第二个操作是告诉B他被A点赞了;如果不采用异步的方式,那就需要在在这两个操作都完成后,才响应A说点赞成功,但是第二个操作显然会耗时很长(例如需要发邮件通知),所以不采用异步方式时A就会有这样一种感觉:怎么点个赞要等半天才响应的,什么垃圾系统!所以,这时候为了提高对A的相应速度,我们可以采用异步的方式:A点赞请求发出之后,程序不需要等到B收到A的点赞通知了,才告诉A说你点赞成功了,因为B收到A的点赞通知相对于A知道自己点赞成功来说,是允许延迟的。
好吧,上面的解说可能有点绕,不过如果你理解了上面的这个例子,大概也就知道异步的适用场景了。
三、简单的事件处理异步框架
前面啰啰嗦嗦铺垫了那么多,下面就用一个比较简单的例子来说明web开发中异步框架的应用场景以及如何实现一个简单的异步框架吧。
首先说明的是,在下面的代码中,我是将最近做的一个项目中的部分业务功能抽取出来的,所以会用到spring的框架以及redis(用于存储生产者产生的数据)相关知识,同时为了提高程序的扩展性,我采用了面向接口编程的方式,利用spring的内置功能实现消费者的自动注册,看不懂可以稍微百度下(其实只是用到了redis的一点皮毛功能,毕竟我也是刚接触redis的菜鸟而已,所以不用担心看不懂)
1、框架的大体模型
主要是包括三个部分:生产者producer类,消费者comsumer类,事件处理的handler接口以及对应的实现类,具体的事件eventModel类(对应数据)。
在这里,producer类会将前端传输过来的eventModel对象进行序列化,将它加入到一个异步队列中,这里采用redis的list数据结构实现。
消费者comsumer则负责将redis中队列的数据读取出来,反序列化后,根据eventModel中的eventType来调用相应的handler具体实现类(handler实现类存储在一个map结构里面,key对应的是eventType,value对应的是具体handler实现类)进行业务处理。
handler实现类负责具体事件的处理,它需要实现一个handler接口(该接口是通过spring进行自动注册的关键,具体后面会讲)。
eventModel是事件模型,它主要存储与事件有关的数据,包括事件类型,时间触发者,事件所属者等数据。具体的后面会讲解。
下面就各个模块进行具体的讲解以及给出相应的代码实现。
2、eventModel事件模型
在讲解其他部分之前,我觉得首先应该简单讲解下我们应该如何组织一个事件模型。直接上代码吧,请注意看注释理解如何组织事件模型:
/** * 事件模型:用于表示个事件 */ public class EventModel { /** * 事件类型,用于标识事件,同时在comsumer中根据这个值确定handler的具体实现类,一般可用一个枚举类型实现 * 例如点赞通知对应的事件类型和注册发邮件进行激活的事件就应该属于不同的eventType,应该对应不同的handler实现类 */ private EventType eventType; /** * 事件触发者,例如用户A给用户B点赞,A就是时间触发者 */ private int actionId;/** * 事件发生对应的关联者,例如A给B点赞,A对应actionId,actionOwnerId */ private int actionOwnerId; /**时间处理需要的额外的数据,采用map的方式可以保证程序的扩展性 * 例如注册发送邮件的操作需要的数据和点赞通知需要的数据并不一样,所以用map存储最大程度地保证程序的灵活性 */ private Map<String,String> exts = new HashMap<>(); /** * 注意序列化需要显式有一个无参构造函数 */ public EventModel(){ } /** * getter 和setter,这部分省略 */ }
在组织eventModel时,我们应该保证灵活性,将必须的变量抽取出来之余,用一个map结构来存储具体业务可能需要的额外数据。
3、producer类
producer的功能较为加单,只是将eventModel进行序列化,然后将它添加进相应的时间队列,具体代码如下:
/** * 事件生产者 */ @Component public class EventProducer { @Autowired private JedisEventHandlerAdaptor jedisEventHandlerAdaptor; @Autowired private JedisKeyUtil jedisKeyUtil; public void add(EventModel model){ String modelJson = JSONObject.toJSONString(model); jedisEventHandlerAdaptor.add(jedisKeyUtil.getEventHandlerKey(),modelJson); } }
没有接触过redis的读者可以认为上面的jedisEventHandlerAdaptor其实就是一个可以操作某个队列的类,在java中其实也可以用阻塞队列来实现的,更具体的读者可以自己尝试。
4、comsumer类
在这个异步事件处理框架中,comsumer主要负责以下的职责:
读取事件队列中的eventModel对象,将它反序列化后,根据eventType负责调用具体的handler实现类;
在初始化的时候利用spring框架自动对handler具体实现类进行注册操作,并将之存储在一个map的数据结构中,key是eventType,valuee是handler具体实现类的对象。
具体的实现方式请读者注意看代码中的注释:
/** * 事件处理类,该类负责调用handler,对事件进行处理,需要实现spring的两个接口,InitializingBean接口是初始化时自动注册handler要用; *ApplicationContextAware则是调用spring的applicationContext(该applicationContext中存储着handler具体实现类的bean对象)需要实现 * 的接口,通过applicationContext获取handler对应的beans,然后就可以将handler自动注册到下面的config对象中了(是一个map) */ @Component public class EventComsumer implements InitializingBean, ApplicationContextAware{ private static final Logger logger = LoggerFactory.getLogger(MessageController.class); /** * threadPoolUtil封装了线程池的线程相关操作 */ @Autowired private ThreadPoolUtil threadPool; @Autowired private JedisEventHandlerAdaptor adaptor; /** * 这是一个与redis交互相关的工具类,用于获取特定的redis key,避免key冲突用,读者可以忽略 */ @Autowired private JedisKeyUtil jedisKeyUtil; /** * spring上下文对象,该对象存储着handler bean对象,必须通过setApplicationContext(ApplicationContextAware接口的实现方法) * 进行初始化,这样才能获取spring中的handler具体实现类的beans */ private ApplicationContext applicationContext; /**设置消费函数阻塞时间,暂定为一天,redis阻塞list中必须要的参数,读者可以忽略 */ private static int COMSUME_TIMEOUT = 24*3600; /** * congif:该变量用于存储type和eventType的映射关系,在消费时,可以直接根据config中你的映射关系进行handler调用 * 注意,这里为了保证程序的灵活性,eventHandler用一个list进行存储,因为有可能一个EventType事件类型可能对应多个 * handler事件处理对象,例如点赞通知这个事件类型可能需要通知被点赞的人以及通知系统管理员,所以应该对应两个事件handler * 更具体的可以参考handler接口设计时的注释 */ Map<EventType,List<EventHandler>> config = new HashMap<>(); /** * spring对该对象进行初始化的时候,将所有的handler具体对象注册到config对象中 */ @Override public void afterPropertiesSet() throws Exception {
//获取所有handler具体对象 Map<String,EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class);
//迭代注册handler对象 for(Map.Entry<String,EventHandler> entry:beans.entrySet()){ EventHandler handler = entry.getValue();
//由于一个handler也可能对应多个事件类型,所以一个handler要注册到所有的eventType中去,这里如果看不懂可以结合后面的解释handler接口代码的注释进行理解 for(EventType type:handler.getHandlerType()){ if(config.get(type)==null){ config.put(type,new ArrayList<EventHandler>()); } config.get(type).add(handler); } } //开线程调用消费函数,注意不能直接调用,否则会导致主线程阻塞 threadPool.execute(new Runnable() { @Override public void run() { doConsume(); } }); } /** * 消费函数,用于执行handler */ public void doConsume() { while(true){ List<String> list = adaptor.pop(String.valueOf(COMSUME_TIMEOUT), jedisKeyUtil.getEventHandlerKey()); //反序列化 EventModel model = JSON.parseObject(list.get(1),EventModel.class); EventType type = model.getEventType(); //获取事件的handler List<EventHandler> handlers = config.get(type); //执行handler for(EventHandler handler:handlers){ handler.doHandler(model); } } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
这里需要重点解释下comsumer中handler自动注册的过程(afterPropertitiesSet方法以及config对象):
首先我们的config对象是一个map,key是一个eventType,表示某个事件;而为什么用list来表示eventHandler呢?这是因为一个事件有可能对应多个eventHandler,所以为了为了保证灵活性,用list形式存储handler最适合了;而关于handler方法中的getHandlerType方法,下面的handler接口设计讲解时会进行详细解释。
另外,关于 InitializingBean, ApplicationContextAware 两个接口的作用具体不讲,读者入股不知道为什么必须实现这两个接口才能借助spring实现自动注册的话,可以进行谷歌或者百度下,相信很容易找到答案的。
5、handler接口设计
直接上代码吧,请注意看注释:
/** * 事件处理器:用于处理事件队列里面的事件,被eventConsumer调用 * doHandler:model是具体的事件模型,它需要由调用者(一般是comsumer)传进来 */ public interface EventHandler { public void doHandler(EventModel model); /** * * @return 表明该接口是什么类型的handler,list表明handler可以支持多个业务,也就是说,一个handler可以对应多个eventType
*例如说,sendEmailHandler,邮件发送handler,具体业务例如注册激活的事件类型,点赞的发邮件通知时间类型都会需要这个handler,
*所以一个handler是有必要对应多个eventType的,这里请读者务必理解,当初我也理解了挺久的。所以,具体handler实现类中必须有一个list变量来存储它对应的事件类型 */ public List<EventType> getHandlerType(); }
这里的handler为什么要对应多个eventType请读者参考注释理解,我觉得理解这个挺重要的,当你理解这个之后,回头看上面的自动注册过程(在comsumer类中)才不会感到懵逼。
最后,我们只需要实现eventHandler接口就可以了,comsumer会在spring启动时自动帮你注册该类,我们只需要在service中声明eventType,comsumer便会自动找到相应的接口执行具体操作。
6、总结
这个简单的异步事假处理框架例子就大概解析到这里了,其实我觉得最主要的是通过这个事件处理框架设计的过程体会和领悟生产者消费者设计模型以及异步框架的工作原理;当然,这个过程其实还有很多其他需要领悟的:例如如何设计接口才能保证灵活性;对象的注册又是什么意思,我们应该如何实现自动注册等等。
用了一个早上终于把这篇博客写完了,其实这个异步事件处理框架还是有点粗糙的,但是在我开来再复杂的异步框架工作原理大体上也是这样的,也希望这篇博客能给读者带来那么一点点收获,不足的地方请各位大佬指正!
以上是关于多线程实现简单的事件异步处理框架的主要内容,如果未能解决你的问题,请参考以下文章