JMS(java消息服务)
JMS是java的一个标准,定义了使用消息代理的通用API。
Spring基于模板类JmsTemplate为JMS提供了支持
Spring还提供了消息驱动POJO的理念:这是一个简单的Java对象,它能够以异步的方式响应队列或主题上到达的消息
消息代理(message broker)和目的地(desttination)
日常收寄快递,会涉及三个对象:
- 寄件人
- 目的地
- 收件人
假设没有快递企业,寄件人必须亲自跋涉千里把物品送到目的地,然后由收件人去领取。
在异步消息中,消息代理充当快递企业的角色。为消息代理指定目的地后,消息代理确保消息送达,同时解放了发送者,使其可以进行其他业务
目的地只关注消息从哪里获取,不关心由谁取走消息。它有两种同用的目的地类型:
- 队列
- 主题
搭建消息代理——ActiveMQ
1、配置依赖
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.15.11</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
2、配置连接工厂
首先配置JMS连接工厂,让应用知道如何连接到ActiveMQ。
ActiveMQConnectionFactory是ActiveMQ自带的连接工厂,在Spring中配置如下:
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://localhost:61616" />
ActiveMQ代理监听默认监听localhost的61616端口,可以使用brokerURL属性来指定端口
3、声明ActiveMQ消息的目的地
如果目的地是队列类型:
<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"
c:_0="queue" />
ActiveMQTopic(String name)
构造器会创建一个队列类型的目的地,并用String对象指明队列的名字
如果目的地是队列类型:
<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic"
c:_0="topic" />
4、使用amq命名空间简化配置
<amq:queue id="spittleQueue" physicalName="spittle.alert.queue" />
<amq:topic id="spittleTopic" physicalName="spittle.alert.topic" />
<amq:connectionFactory id="connectionFactory"
brokerURL="tcp://localhost:61616" />
发送和接受消息
如果不适用JmsTemplate模板,发送消息需要如下步骤:
- 建立ConnectionFactory工厂对象,需要填入用户名、密码、连接地址(一般使用默认,如果没有修改的话)
- 通过ConnectionFactory对象创建一个Connection连接,并且调用Connection的start方法开启连接,Connection方法默认是关闭的
- 通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数1是是否启用事物,参数2是签收模式,一般设置为自动签收
- 通过Session对象创建Destination对象(目的地),指的是一个客户端用来制定生产消息目标和消费消息来源的对象。在PTP的模式中,Destination被称作队列,在Pub/Sub模式中,Destination被称作主题(Topic)
- 通过Session对象创建消息的发送和接收对象(生产者和消费者)
- 通过MessageProducer的setDeliverMode方法为其设置持久化或者非持久化特性
- 使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据。客户端同理。记得关闭
public static void main(String[] args) throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://94.191.49.192:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i=0;i<=5;i++) {
TextMessage textMessage = session.createTextMessage();
textMessage.setText("我是第"+i+"消息");
producer.send(textMessage);
}
if(connection!=null){
connection.close();
}
}
通过JmsTemplate模板可以简化上述代码
1、配置JmsTemplate
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
c:_0-ref="connectionFactory"
<!--可选属性, 指定默认目的地 -->
p:defaultDestination-ref="queue" />
如果系统存在目的地,可以通过p:defaultDestination-ref将目的地bean装配进来;
此外,也可以用defaultDestinationName属性指明消息通道名称,如果命名目的地存在就使用已有的,否则创建该命名目的地.
它的messageConverter属性可以指定消息转换器,JmsTemplate在convertAndSend()方法中默认使用SimpleMessage Converter
2、发送和接受消息
import org.springframework.jms.core.JmsOperations;
public class AlertServiceImpl implements AlertService {
private JmsOperations jmsOperations;
public AlertServiceImpl(JmsOperations jmsOperations) {
this.jmsOperations = jmsOperations;
}
// 使用send方法发送消息
// public void sendSpittleAlert(final Spittle spittle) {
// jmsOperations.send(
// "spittle.alert.queue",
// new MessageCreator() {
// public Message createMessage(Session session)
// throws JMSException {
// return session.createObjectMessage(spittle);
// }
// }
// );
// }
// 使用convertAndSend方法发送消息
public void sendSpittleAlert(Spittle spittle) {
jmsOperations.convertAndSend(spittle);
}
// 使用receive方法接收消息
// public Spittle getSpittleAlert() {
// try {
// ObjectMessage message = (ObjectMessage) jmsOperations.receive();
// return (Spittle) message.getObject();
// } catch (JMSException e) {
// throw JmsUtils.convertJmsAccessException(e);
// }
// }
// 使用receiveAndConvert方法接收消息
public Spittle retrieveSpittleAlert() {
return (Spittle) jmsOperations.receiveAndConvert();
}
}
convertAndSend
/receiveAndConvert
会使用内置的消息转换器(message converter)创建消息。这两个方法在接收Object对象时,Object对象必须实现Serializable接口而且有默认构造器
使用JmsTemplate接收消息的最大缺点在于receive()和receiveAndConvert()方法都是同步的
创建消息驱动的POJO
消息驱动(MDB)来源于EJB2规范。MDB可以异步处理消息,MDB将JMS目的地中的消息作为事件,并对这些事件进行响应
MDB需要实现javax.jms.MessageListener接口的onMessage(message message)f的方法,并使用@MessageDriven(mappedName="destination")注解标注MDB
Spring提供了以POJO的方式处理消息的能力,不需要实现javax.jms.MessageListener接口
只需要把处理消息的POJO配置为消息监听器,这个POJO便能接受消息
<jms:listener-container connection-factory="connectionFactory">
<jms:listener destination="destination"
<!-- messageHandler是自定义的POJO消息处理器 -->
ref="messageHandler"
<!-- messageHandler是自定义的POJO消息处理器的处理方法 -->
method="handler" />
</jms:listener-container>
消息监听器容器(message listener container)是一个特殊的bean,它可以监控JMS目的地并等待消息到达。一旦有消息到达,它取出消息,然后把消息传给任意一个对此消息感兴趣的消息监听器
connection-factory属性配置了连接工厂,容器中的每个jms:listener都使用这个连接工厂进行消息监听,connection-factory属性默认值为connectionFactory
对于jms:listener元素如果ref属性所标示的bean实现了MessageListener,那就没有必要再指定 method 属性了,默认就会调用onMessage()方法