ActiveMQ发送和监听类

Posted 胡乐天

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ发送和监听类相关的知识,希望对你有一定的参考价值。

本文内容共分为三大块:

①.发送消息到MQ队列

②.在固定时间内接收一个MQ消息

③.监听MQ消息队列

依赖



  <dependencies>
    <!--javax属于java的扩展包,不在标准库中-->
    <dependency>
      <groupId>javax.jms</groupId>
      <artifactId>jms</artifactId>
      <version>1.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.13</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.13.2</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

1.发送消息到MQ队列

MQ连接工厂类:

通过该类的静态方法可以直接获取Connection

package com.lt.service;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

public class ActiveMQConnFactory {
    static {
        //连接信息可以在配置文件中取
        ConnectionFactory factory = new ActiveMQConnectionFactory("admin","admin","failover:(tcp://127.0.0.1:61616)");

        //构造从工厂得到连接对象
            try {
                conn = factory.createConnection();
            } catch (JMSException e) {
                e.printStackTrace();

            }
    }

    private static Connection conn;


    private ActiveMQConnFactory(){}

    public static Connection getConnection() throws JMSException {
        conn.start();
        return conn;
    }
}

消息发送者类:

构造方法;
发送文本消息方法;
发送map消息方法;
关闭session的方法;

package com.lt.service.producer;

import com.lt.service.ActiveMQConnFactory;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQTextMessage;

import javax.jms.*;

//消息发送者
public class Producer {
    //连接工厂,JMS用它创建连接
    private Connection connection;
    //一个发送或接收消息的线程
    private Session session;
    //MessageProducer: 消息发送者
    private MessageProducer producer;
    //队列,目的地
    private Destination destination;

    //构造器
    public Producer() throws JMSException {
        // 从工厂得到连接对象
        connection = ActiveMQConnFactory.getConnection();
        //获取操作连接
        session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
        //得到消息生成者,即发送者,参数为queue
        producer = session.createProducer(null);
        //设置持久化
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    }

    //发送text消息
    public void sendMessage(String queue,String str){
        try {
            destination = session.createQueue(queue);
            //声明发送消息的类型
            TextMessage message = new ActiveMQTextMessage();
            message.setText(str);
            producer.send(destination,message);
            close();
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }

    //发送map消息
    public void sendMessage(String queue, ActiveMQMapMessage map) throws JMSException {
        destination = session.createQueue(queue);
        producer.send(destination,map);
        close();
    }

    public void close() throws JMSException {
        if(session != null){
            session.close();
        }
    }
}

发送文本消息方法封装类:

对发送文本消息进行再次封装,这样就可以通过静态方法进行调用了。

package com.lt.service.producer;

import javax.jms.JMSException;

public class ActivemqSender {

    //发送文本消息
    public static void sendTextMessageage(String queue,String message){
        try {
            Producer producer = new Producer();
            producer.sendMessage(queue, message);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

测试代码(测试类中):

    /**
     * 发送map消息
     */
    @Test
    public void sendMap() throws JMSException {
        Producer producer = new Producer();
        ActiveMQMapMessage mapMessage = new ActiveMQMapMessage();
        mapMessage.setString("test1","test11");
        mapMessage.setString("test2","test22");
        mapMessage.setInt("test3",75);
        producer.sendMessage("test",mapMessage);
    }

    /**
     * 发送文本消息
     */
    @Test
    public void sendText(){
        ActivemqSender.sendTextMessageage("test","测试文本");
    }

2.固定时间内接收一条消息

消息接收类:

package com.lt.service.consumer;

import com.lt.service.ActiveMQConnFactory;

import javax.jms.*;
//消息接收类
public class Consumer {
    //Connection : JMS客户端到JMS Provider的连接
    private Connection connection;
    //Session : 一个发送或接收消息的线程
    private Session session;
    // Destination :消息目的地
    private Destination destination;
    //消费者,消息接受者
    private MessageConsumer consumer;

    public Consumer() throws JMSException {
        //从工厂得到连接对象
        connection = ActiveMQConnFactory.getConnection();
        connection.start();
        //获取连接操作
        session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
    }

    //接收消息(每次只接收一个) 可接收文本消息或Map,别的消息会报异常,可以自己进行扩展
    public Object onMessage(String queue, long time) throws Exception {

            destination = session.createQueue(queue);
            consumer = session.createConsumer(destination);
            //Receive the next message that within the specified timeout interval.
            //接收下一条消息在一个明确的时间间隔内。(单位:毫秒)
            Message receive = consumer.receive(time);
            if(null == receive){
                return null;
            }
            //此处注意,若队列中存储的消息类型不是TextMessage,也会接收消息,队列中该消息就会消失
            if(receive instanceof TextMessage){
                TextMessage textMessage = (TextMessage) receive;
                if (null == textMessage){
                    return null;
                }
                close();
                return textMessage.getText();
            }else if(receive instanceof MapMessage){
                MapMessage mapMessage = (MapMessage) receive;
                if(null == mapMessage){
                    return null;
                }
                close();
                return mapMessage;
            } else{
                throw new Exception("类型不匹配");
            }
    }

    /**
     * 添加监听器方法
     */
    public void addListener(String queue,MessageListener listener){
        try {
            Destination destination = session.createQueue(queue);
            MessageConsumer messageConsumer = session.createConsumer(destination);
            messageConsumer.setMessageListener(listener);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    /**
     * 关闭连接
     * @throws JMSException
     */
    public void close() throws JMSException {
        if(consumer != null){
            consumer.close();
        }
        if(session != null){
            session.close();
        }
    }
}

测试代码:

    /**
     * 接收Text/Map消息
     */
    @Test
    public void getMessage() throws Exception {
        Consumer consumer = new Consumer();
        //监听10秒,看看是否有消息进入队列,如果有,则进行消费
        Object result = consumer.onMessage("test", 100000);
        if(result instanceof String){
            System.out.println(result);
        }else if(result instanceof MapMessage){
            MapMessage mapMessage = (MapMessage) result;
            System.out.println(mapMessage.toString());
            System.out.println(mapMessage.getObject("test1"));
        }else{
            System.out.println("队列中无消息");
        }
    }

3.监听MQ消息队列

监听类:

增加监听的方法,在2中含有,即:addListener方法

package com.lt.service.listener;

import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class TextListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if(null == message){
            System.out.println("消息为空");
        }else{
            if(message instanceof TextMessage){
                System.out.println(message);
            }else if(message instanceof MapMessage){
                MapMessage mapMessage = (MapMessage) message;
                System.out.println(mapMessage.toString());
            }else{
                System.out.println("未知类型:" + message);
            }
        }
    }
}

测试代码:

    /**
     * 持续接收,此处是在Test中测试使用,所以使用while(true) 保持监听,实际情况下,可以使用 实现ServletContextListener接口的类,进行书写监听
     */
    @Test
    public void addListener() throws JMSException {
        Consumer consumer = new Consumer();
        consumer.addListener("test?consumer.prefetchSize=10",new TextListener());
        //while(true) 是为了不关闭监听,测试使用
        while (true){}
    }

以上是关于ActiveMQ发送和监听类的主要内容,如果未能解决你的问题,请参考以下文章

java操作ActiveMQ消息队列

客户注册功能,发短信功能分离 通过ActiveMQ实现

如何心跳感知与ActiveMQ服务器的连接状态

ActiveMQ使用

ActiveMQ消息队列技术融合Spring

activeMQ能否实现消息推送?