ActiveMQ发送和监听类
Posted 胡乐天
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ发送和监听类相关的知识,希望对你有一定的参考价值。
本文内容共分为三大块:
①.发送消息到MQ队列
②.在固定时间内接收一个MQ消息
③.监听MQ消息队列
依赖
<dependencies>
<!--javax属于java的扩展包,不在标准库中-->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.13</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies>
1.发送消息到MQ队列
MQ连接工厂类:
通过该类的静态方法可以直接获取Connection
package com.lt.service;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
public class ActiveMQConnFactory {
static {
//连接信息可以在配置文件中取
ConnectionFactory factory = new ActiveMQConnectionFactory("admin","admin","failover:(tcp://127.0.0.1:61616)");
//构造从工厂得到连接对象
try {
conn = factory.createConnection();
} catch (JMSException e) {
e.printStackTrace();
}
}
private static Connection conn;
private ActiveMQConnFactory(){}
public static Connection getConnection() throws JMSException {
conn.start();
return conn;
}
}
消息发送者类:
构造方法;
发送文本消息方法;
发送map消息方法;
关闭session的方法;
package com.lt.service.producer;
import com.lt.service.ActiveMQConnFactory;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import javax.jms.*;
//消息发送者
public class Producer {
//连接工厂,JMS用它创建连接
private Connection connection;
//一个发送或接收消息的线程
private Session session;
//MessageProducer: 消息发送者
private MessageProducer producer;
//队列,目的地
private Destination destination;
//构造器
public Producer() throws JMSException {
// 从工厂得到连接对象
connection = ActiveMQConnFactory.getConnection();
//获取操作连接
session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
//得到消息生成者,即发送者,参数为queue
producer = session.createProducer(null);
//设置持久化
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
//发送text消息
public void sendMessage(String queue,String str){
try {
destination = session.createQueue(queue);
//声明发送消息的类型
TextMessage message = new ActiveMQTextMessage();
message.setText(str);
producer.send(destination,message);
close();
} catch (JMSException e) {
e.printStackTrace();
}
}
//发送map消息
public void sendMessage(String queue, ActiveMQMapMessage map) throws JMSException {
destination = session.createQueue(queue);
producer.send(destination,map);
close();
}
public void close() throws JMSException {
if(session != null){
session.close();
}
}
}
发送文本消息方法封装类:
对发送文本消息进行再次封装,这样就可以通过静态方法进行调用了。
package com.lt.service.producer;
import javax.jms.JMSException;
public class ActivemqSender {
//发送文本消息
public static void sendTextMessageage(String queue,String message){
try {
Producer producer = new Producer();
producer.sendMessage(queue, message);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
测试代码(测试类中):
/**
* 发送map消息
*/
@Test
public void sendMap() throws JMSException {
Producer producer = new Producer();
ActiveMQMapMessage mapMessage = new ActiveMQMapMessage();
mapMessage.setString("test1","test11");
mapMessage.setString("test2","test22");
mapMessage.setInt("test3",75);
producer.sendMessage("test",mapMessage);
}
/**
* 发送文本消息
*/
@Test
public void sendText(){
ActivemqSender.sendTextMessageage("test","测试文本");
}
2.固定时间内接收一条消息
消息接收类:
package com.lt.service.consumer;
import com.lt.service.ActiveMQConnFactory;
import javax.jms.*;
//消息接收类
public class Consumer {
//Connection : JMS客户端到JMS Provider的连接
private Connection connection;
//Session : 一个发送或接收消息的线程
private Session session;
// Destination :消息目的地
private Destination destination;
//消费者,消息接受者
private MessageConsumer consumer;
public Consumer() throws JMSException {
//从工厂得到连接对象
connection = ActiveMQConnFactory.getConnection();
connection.start();
//获取连接操作
session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
}
//接收消息(每次只接收一个) 可接收文本消息或Map,别的消息会报异常,可以自己进行扩展
public Object onMessage(String queue, long time) throws Exception {
destination = session.createQueue(queue);
consumer = session.createConsumer(destination);
//Receive the next message that within the specified timeout interval.
//接收下一条消息在一个明确的时间间隔内。(单位:毫秒)
Message receive = consumer.receive(time);
if(null == receive){
return null;
}
//此处注意,若队列中存储的消息类型不是TextMessage,也会接收消息,队列中该消息就会消失
if(receive instanceof TextMessage){
TextMessage textMessage = (TextMessage) receive;
if (null == textMessage){
return null;
}
close();
return textMessage.getText();
}else if(receive instanceof MapMessage){
MapMessage mapMessage = (MapMessage) receive;
if(null == mapMessage){
return null;
}
close();
return mapMessage;
} else{
throw new Exception("类型不匹配");
}
}
/**
* 添加监听器方法
*/
public void addListener(String queue,MessageListener listener){
try {
Destination destination = session.createQueue(queue);
MessageConsumer messageConsumer = session.createConsumer(destination);
messageConsumer.setMessageListener(listener);
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* 关闭连接
* @throws JMSException
*/
public void close() throws JMSException {
if(consumer != null){
consumer.close();
}
if(session != null){
session.close();
}
}
}
测试代码:
/**
* 接收Text/Map消息
*/
@Test
public void getMessage() throws Exception {
Consumer consumer = new Consumer();
//监听10秒,看看是否有消息进入队列,如果有,则进行消费
Object result = consumer.onMessage("test", 100000);
if(result instanceof String){
System.out.println(result);
}else if(result instanceof MapMessage){
MapMessage mapMessage = (MapMessage) result;
System.out.println(mapMessage.toString());
System.out.println(mapMessage.getObject("test1"));
}else{
System.out.println("队列中无消息");
}
}
3.监听MQ消息队列
监听类:
增加监听的方法,在2中含有,即:addListener方法
package com.lt.service.listener;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class TextListener implements MessageListener {
@Override
public void onMessage(Message message) {
if(null == message){
System.out.println("消息为空");
}else{
if(message instanceof TextMessage){
System.out.println(message);
}else if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage) message;
System.out.println(mapMessage.toString());
}else{
System.out.println("未知类型:" + message);
}
}
}
}
测试代码:
/**
* 持续接收,此处是在Test中测试使用,所以使用while(true) 保持监听,实际情况下,可以使用 实现ServletContextListener接口的类,进行书写监听
*/
@Test
public void addListener() throws JMSException {
Consumer consumer = new Consumer();
consumer.addListener("test?consumer.prefetchSize=10",new TextListener());
//while(true) 是为了不关闭监听,测试使用
while (true){}
}
以上是关于ActiveMQ发送和监听类的主要内容,如果未能解决你的问题,请参考以下文章