Spring中的观察者模式(发布订阅模式)(基于SpringBoot实现)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring中的观察者模式(发布订阅模式)(基于SpringBoot实现)相关的知识,希望对你有一定的参考价值。
参考技术A spring中的事件机制涉及到者几个类文件。ApplicationEvent(事件类型)、ApplicationListener(事件监听类)、ApplicationEventPublisher(事件发布类)。
在实际开发中,有一个这样的例子,当下单成功后会发送手机短信、发送绑定邮箱、微信、等
如有不同见解,欢迎指正。谢谢!
在商城系统中使用设计模式----策略模式之在spring中使用观察者模式和发布/订阅
1.概念:
观察者模式:
是属于设计者模式中的一种,一个目标对象管理所有相依于它的观察者对象,并且在它本身的状态改变时主动发出通知。
发布/订阅:
是一种消息范式,消息的发送者(称为发布者)不会将消息直接发送给特定的接收者(称为订阅者),而是通过调度器将消息发布给订阅者。
2.区别:下图明显可以看出发布/订阅比观察者模式中多了一层中间信道,
- 在Observer模式中,O bservers知道Subject,同时Subject还保留了Observers的记录。然而,在发布者/订阅者中,发布者和订阅者不需要彼此了解。他们只是在消息队列或代理的帮助下进行通信。
- 在Publisher / Subscriber模式中,组件是松散耦合的,而不是Observer模式。
- 观察者模式主要以同步方式实现,即当某些事件发生时,Subject调用其所有观察者的适当方法。的发行者/订户图案在一个实施大多异步方式(使用消息队列)。
- 观察者模式需要在单个应用程序地址空间中实现。另一方面,发布者/订阅者模式更像是跨应用程序模式。
3.使用场景:
当用户下单成功后,要执行 修改订单状态,分佣,通知店主发货。
4.实现一:观察者模式:
(1) java.util包提供了对该模式的支持,提供了Observer(观察者)方法和Obervable(被观察者)方法。
package java.util; public interface Observer { /** * 每当观察对象发生变化时,都会调用此方法。 * * @param o the observable object. * @param arg an argument passed to the <code>notifyObservers</code> * method. */ void update(Observable o, Object arg); }
package java.util; public class Observable { private boolean changed = false; private Vector<Observer> obs; /** Construct an Observable with zero Observers. */ public Observable() { obs = new Vector<>(); } /** * 添加观察者 * @param o an observer to be added. * @throws NullPointerException if the parameter o is null. */ public synchronized void addObserver(Observer o) { if (o == null) throw new NullPointerException(); if (!obs.contains(o)) { obs.addElement(o); } } /** * 删除观察者 * @param o the observer to be deleted. */ public synchronized void deleteObserver(Observer o) { obs.removeElement(o); } /**
* 方法被调用的时候 * 通知观察者 * * @see java.util.Observable#clearChanged() * @see java.util.Observable#hasChanged() * @see java.util.Observer#update(java.util.Observable, java.lang.Object) */ public void notifyObservers() { notifyObservers(null); } /** * 通知观察者*/ public void notifyObservers(Object arg) { /* * a temporary array buffer, used as a snapshot of the state of * current Observers. */ Object[] arrLocal; synchronized (this) { /*
* 同步保证在通知的时候,此资源不被其他所占用,避免发生在这个时候观察者发生变化
* */ if (!changed) return; arrLocal = obs.toArray(); clearChanged(); } for (int i = arrLocal.length-1; i>=0; i--) ((Observer)arrLocal[i]).update(this, arg); } /** * Clears the observer list so that this object no longer has any observers. */ public synchronized void deleteObservers() { obs.removeAllElements(); } /** * Marks this <tt>Observable</tt> object as having been changed; the * <tt>hasChanged</tt> method will now return <tt>true</tt>. */ protected synchronized void setChanged() { changed = true; } /** * Indicates that this object has no longer changed, or that it has * already notified all of its observers of its most recent change, * so that the <tt>hasChanged</tt> method will now return <tt>false</tt>. * This method is called automatically by the * <code>notifyObservers</code> methods. * * @see java.util.Observable#notifyObservers() * @see java.util.Observable#notifyObservers(java.lang.Object) */ protected synchronized void clearChanged() { changed = false; } /** * Tests if this object has changed. * * @return <code>true</code> if and only if the <code>setChanged</code> * method has been called more recently than the * <code>clearChanged</code> method on this object; * <code>false</code> otherwise. * @see java.util.Observable#clearChanged() * @see java.util.Observable#setChanged() */ public synchronized boolean hasChanged() { return changed; } /** * Returns the number of observers of this <tt>Observable</tt> object. * * @return the number of observers of this object. */ public synchronized int countObservers() { return obs.size(); } }
(2)被观察者继承Observable类。
import java.util.Observable; /** * @description:被观察者 * @author: Chen * @create: 2019-04-03 23:01 **/ public class PayObservable extends Observable { @Override public void notifyObservers() { System.out.println("有人下单啦"); setChanged(); super.notifyObservers(); } }
(3)观察者继承Observer接口
/** * @description:佣金观察者 * @author: Chen * @create: 2019-04-03 22:55 **/ public class CommissionObserver implements Observer { public CommissionObserver(Observable observable){ observable.addObserver(this); } @Override public void update(Observable o, Object arg) { System.out.println("大家注意了,有傻逼下单了,我要开始分佣金了"); } }
package com.chen.observer; import java.util.Observable; import java.util.Observer; /** * @description:消息观察者 * @author: Chen * @create: 2019-04-03 22:50 **/ public class MsgObserver implements Observer { public MsgObserver(Observable observable){ observable.addObserver(this); } @Override public void update(Observable o, Object arg) { System.out.println("管理员,有人下单啦。"); } }
package com.chen.observer; import java.util.Observable; import java.util.Observer; /** * @description:订单观察者 * @author: Chen * @create: 2019-04-03 22:49 **/ public class OrderObserver implements Observer { public OrderObserver(Observable observable){ observable.addObserver(this); } @Override public void update(Observable o, Object arg) { System.out.println("赶紧把订单改成已支付"); } }
(4)调用入口
/** * @description:当用户下单成功后,要执行 修改订单状态,分佣,通知店主发货 * @author: Chen * @create: 2019-04-03 22:34 **/ public class ObserverMain { public static void main(String[] args) { PayObservable payObservable = new PayObservable(); new CommissionObserver(payObservable); new MsgObserver(payObservable); new OrderObserver(payObservable); payObservable.notifyObservers(); } }
执行结果:
源码地址:商城中的观察者模式
实现二:发布/订阅
springboot有提供一些支持发布/订阅模式的基本类,我们如图只需关心,发布者发布事件,订阅者接收事件。不会去理会事件通道如何广播通知。
(1)springboot提供的基础类:
package org.springframework.context; import java.util.EventObject; /** * 所有应用程序事件都要扩展的类。
* 发布者将此事件发布出去,订阅者根据事件类型去判断,进行业务处理
*/ public abstract class ApplicationEvent extends EventObject { /** use serialVersionUID from Spring 1.2 for interoperability. */ private static final long serialVersionUID = 7099057708183571937L; /** System time when the event happened. */ private final long timestamp; /** * Create a new ApplicationEvent. * @param source the object on which the event initially occurred (never {@code null}) */ public ApplicationEvent(Object source) { super(source); this.timestamp = System.currentTimeMillis(); } /** * Return the system time in milliseconds when the event happened. */ public final long getTimestamp() { return this.timestamp; } }
package org.springframework.context; import java.util.EventListener; /**
* 订阅者继承此接口
* 应用程序事件侦听器实现的接口 * 继承此接口的类,要注入到spring容器中,交给bean工厂进行管理,这样在广播到时候,才能接收到信息*/ @FunctionalInterface public interface ApplicationListener<E extends ApplicationEvent> extends EventListener { /** * 接收到事件后进行处理 */ void onApplicationEvent(E event); }
package org.springframework.context; import org.springframework.beans.BeansException; import org.springframework.beans.factory.Aware;
/**
* spring容器包装接口
* 发布者继承此接口,调用容器中publishEvent(ApplicationEvent event)方法,发布事件。
*/ public interface ApplicationContextAware extends Aware { /** * 设置spring容器到此处*/ void setApplicationContext(ApplicationContext applicationContext) throws BeansException; }
(2)支付事件继承ApplicationEvent类。
import org.springframework.context.ApplicationEvent; /** * @description:支付事件,有人支付了订单 * @author: Chen * @create: 2019-04-21 10:26 **/ public class PayEvent extends ApplicationEvent { public PayEvent(Object source) { super(source); System.out.println("MsgEvent:"+source.toString()); } }
(3)订阅者继承ApplicationListener接口
/** * @description:佣金监听器 * @author: Chen * @create: 2019-04-21 23:50 **/ @Component public class CommissionListener implements ApplicationListener { @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof PayEvent){ System.out.println("有人下单,开始分佣啦"); } System.out.println("收到广播:MsgListener"); } }
/** * @description:信息监听器 * @author: Chen * @create: 2019-04-21 09:57 **/ @Component public class MsgListener implements ApplicationListener { @Override public void onApplicationEvent(ApplicationEvent applicationEvent) { if (applicationEvent instanceof PayEvent){ System.out.println("通知管理员,有人下单了"); } System.out.println("收到广播:MsgListener"); } }
/** * @description: 订单监听器 * @author: Chen * @create: 2019-04-21 23:48 **/ @Component public class OrderListener implements ApplicationListener { @Override public void onApplicationEvent(ApplicationEvent event) { if (event instanceof PayEvent){ System.out.println("把订单状态改成已支付"); } System.out.println("收到广播:OrderListener"); } }
(4)事件发布者继承ApplicationContextAware
/** * @description:发布者 * @author: Chen * @create: 2019-04-21 10:28 **/ @Component public class EventPublisher implements ApplicationContextAware { private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext=applicationContext; } public void publishEvent(ApplicationEvent event){ applicationContext.publishEvent(event); } }
(5)测试入口
@RestController public class TestController { @Autowired private EventPublisher eventListener; @RequestMapping("pay") public String pay(){ PayEvent payEvent = new PayEvent("订单支付成功。"); eventListener.publishEvent(payEvent); return "success"; } }
结果:
以上是关于Spring中的观察者模式(发布订阅模式)(基于SpringBoot实现)的主要内容,如果未能解决你的问题,请参考以下文章
在商城系统中使用设计模式----策略模式之在spring中使用观察者模式和发布/订阅