ActiveMQ-初识MQ--队列Queue

Posted 闲言博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ-初识MQ--队列Queue相关的知识,希望对你有一定的参考价值。

1.导入依赖

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>

2.编写生产者代码

编写生产者步骤

  1. 获取连接工厂
  2. 获取连接 connection
  3. 获取session
  4. 创建目的地 queue
  5. 创建创建生产者
  6. 创建消息
  7. 发送消息
  8. 释放资源
public class JmsProduce 
    private static String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    private static String QUEUE_NAME = "test01";

    public static void main(String[] args) throws Exception
        //1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码。
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通过连接工厂,获得连接 connection 并启动访问。
        Connection connection = factory.createConnection();
        connection.start();
        // 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4 创建目的地(两种 :队列/主题)。Destination是Queue和Topic的父类
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 创建消息的生产者
        MessageProducer producer = session.createProducer(queue);
        // 6 通过messageProducer 生产 3 条 消息发送到消息队列中
        TextMessage textMessage = session.createTextMessage();
        for (int i = 0; i < 5; i++) 
            // 7  创建消息
            textMessage.setText("这是"+i+"条消息");
            // 8  通过messageProducer发送给mq
            producer.send(textMessage);
        
        // 9 关闭资源
        producer.close();
        session.close();
        connection.close();
    

往test01 这个队列中发送了5条消息

3.查看后台

发送消息之前,test01 队列的情况

发送消息之后,test01 队列的情况
有5个待消费的消息

4.编写消费者代码(while阻塞方式)

编写生产者步骤

  1. 获取连接工厂
  2. 获取连接 connection
  3. 获取session
  4. 创建目的地 queue
  5. 创建创建消费者
  6. 消费消息
  7. 释放资源【处于阻塞状态的话不会释放】
public class JmsConsumer 
    private static String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    private static String QUEUE_NAME = "test01";

    public static void main(String[] args) throws JMSException 
        //获取工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //获取连接
        Connection connection = factory.createConnection();
        connection.start();
        //获取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        //创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        /* 通过监听的方式来消费消息,是异步非阻塞的方式消费消息。
           通过messageConsumer 的setMessageListener 注册一个监听器,当有消息发送来时,系统自动调用MessageListener 的 onMessage 方法处理消息
         */
        while (true)
            // reveive() 一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式 。和socket的accept方法类似的。
            // reveive(Long time) : 等待n毫秒之后还没有收到消息,就是结束阻塞。
            // 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage
            TextMessage receive = (TextMessage)consumer.receive();
            if (receive != null)
                String msg = receive.getText();
                System.out.println(msg);
            else 
                break;
            
        
        consumer.close();
        session.close();
        connection.close();
    

4.编写消费者代码(监听器方式)

public class JmsConsumerAsyn 
    private static String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
    private static String QUEUE_NAME = "test01";

    public static void main(String[] args) throws JMSException, IOException 
        //获取工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //获取连接
        Connection connection = factory.createConnection();
        connection.start();
        //获取session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        //创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        /* 通过监听的方式来消费消息,是异步非阻塞的方式消费消息。
           通过messageConsumer 的setMessageListener 注册一个监听器,当有消息发送来时,系统自动调用MessageListener 的 onMessage 方法处理消息
         */
        consumer.setMessageListener(new JmsConsumerListener());
        // 让主线程不要结束。因为一旦主线程结束了,其他的线程(如此处的监听消息的线程)也都会被迫结束。
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    



/**
 * 异步监听消费者
 */
public class JmsConsumerListener implements MessageListener 

    //有消息时会触发该方法
    @Override
    public void onMessage(Message message) 
        //监听
        TextMessage textMessage = (TextMessage) message;
        if (null != textMessage)
            try 
                System.out.println(textMessage.getText());
             catch (JMSException e) 
                e.printStackTrace();
            
        
    

5.查看后台

消费者启动前,消费者的数量是0,消费的消息条数是0


启动消费者

以上是关于ActiveMQ-初识MQ--队列Queue的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ-初识MQ--队列Queue

消息队列MQ - Apache ActiveMQ

Spring整合ActiveMQ及多个Queue消息监听的配置

01-初识消息队列MQ&&Rabbit相关概念介绍

深入了解ActiveMQ!

本篇文章主要是讲SpringBoot集成activeMQ实现Queue模式点对点通信