消息中间件ActiveMQ学习笔记 [Java编码MQ,消费者生产者基本模型]

Posted 小智RE0

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息中间件ActiveMQ学习笔记 [Java编码MQ,消费者生产者基本模型]相关的知识,希望对你有一定的参考价值。

近期计划学习一下消息队列;
找到的学习视频地址:尚硅谷ActiveMQ教程快速入门


文章目录


1.Java编码MQ,模拟基础生产者消费者


  • 创建一个Connection Factory连接工厂;
  • 然后通过连接工厂创建connection连接;
  • 启动该链接后,可通过连接出session 会话;
  • 创建destination目的地[可理解为队列/一种topic主题];
  • 创建出生产者producer / message消息,然后设置destination目的地;
  • 创建出消费者consumer,或者注册一个消息监听器message listener;
  • 生产者可以发送资源,消费者接收消息;
  • 最终完成操作后,可以关闭连接.


首先创建一个简单的maven项目;
在pom.xml文件中使用下面的依赖;

<dependencies>
     <!-- activemq使用的jar包-->
     <dependency>
         <groupId>org.apache.activemq</groupId>
         <artifactId>activemq-all</artifactId>
         <version>5.16.4</version>
     </dependency>
     <!-- activemq和 spring整合-->
     <dependency>
         <groupId>org.apache.xbean</groupId>
         <artifactId>xbean-spring</artifactId>
         <version>3.16</version>
     </dependency>
     <!--日志-->
     <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.25</version>
     </dependency>
     <dependency>
         <groupId>ch.qos.logback</groupId>
         <artifactId>logback-classic</artifactId>
         <version>1.2.3</version>
     </dependency>
     <dependency>
         <groupId>org.projectlombok</groupId>
         <artifactId>lombok</artifactId>
         <version>1.18.22</version>
     </dependency>
     <!--测试-->
     <dependency>
         <groupId>junit</groupId>
         <artifactId>junit</artifactId>
         <version>4.12</version>
         <scope>test</scope>
     </dependency>
</dependencies>

实际操作;

自定义消息生产者


就从这个MQ工厂类ActiveMQConnectionFactory来看;

无参构造中提示需要使用这样一个常量;DEFAULT_BROKER_URL

public static final String DEFAULT_BROKER_URL = "failover://"+DEFAULT_BROKER_BIND_URL;

自定义写一个MQ的生产者;

package com.xiaozhi.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @BelongsProject: activemqstudyday1
 * @BelongsPackage: com.xiaozhi.activemq
 * @Author: 信计1801 李智青
 * @Date: 2022/4/7 16:02
 * @Description: 自定义生产者
 */
public class MyProduce 
    //链接url
    private static final String ACTIVEMQ_URL = "tcp://192.168.59.128:61616";
    //目标队列名称;
    private static final String QUEUE_NAME = "myDeque";

    public static void main(String[] args) throws JMSException 
        //1.按照自己的链接,创建连接的工厂;
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        //2.通过工厂创建链接;-->开启;
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话; 参数:  事务, 签收机制;
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地;例如:队列
        Destination destination = session.createQueue(QUEUE_NAME);
        //5.创建消息生产者;
        MessageProducer producer = session.createProducer(destination);
        //生产6条消息存入到MQ中间件的队列中;
        for (int i = 0; i <= 5; i++) 
            //表名是第几条消息;
            TextMessage textMessage = session.createTextMessage("this is the" + i + "message");
            producer.send(textMessage);
        
        //关闭资源;
        producer.close();
        session.close();
        connection.close();

        System.out.println("+-+-+-+-+-+-+-+-+-+我把消息都发给中间大佬MQ了");
    

编码完成后, 打开我linux上的ActiveMQ,启动;
本地访问http://192.168.59.128:8161/admin/queues.jsp;注意此时还没有消息;

现在运行自定义的生产者代码;

麻了,一开始运行老报错,连接超时;
然后去linux 在防火墙设置开放端口61616

firewall-cmd --zone=public --add-port=61616/tcp --permanent

然后重启防火墙

 firewall-cmd --reload

运行Java代码;

再次刷新访问页面,可看到消息已发布出去了;

其中的几个参数:
Number Of Pending Messages:等待消费的消息,即未出队列的数量 =总接收数-总出队列数。
Number Of Consumers:消费者数量

Messages Enqueued进队列的总消息量,包括出队列的.
Messages Dequeued:出队消息数,即消费者使用的数量


自定义同步阻塞式的消息消费者


package com.xiaozhi.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @BelongsProject: activemqstudyday1
 * @BelongsPackage: com.xiaozhi.activemq
 * @Author: 信计1801 李智青
 * @Date: 2022/4/7 18:14
 * @Description: 自定义消费者
 */
public class MyConsumer 
    //链接url
    public static final String ACTIVEMQ_URL = "tcp://192.168.59.128:61616";
    //目标队列名称;
    public static final String QUEUE_NAME = "myDeque";

    public static void main(String[] args) throws JMSException 
        //1.按照自己的链接,创建连接的工厂;
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        //2.通过工厂创建链接;-->开启;
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话; 参数:  事务, 签收机制;
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地;例如:队列
        Destination destination = session.createQueue(QUEUE_NAME);
        //创建消费者;
        MessageConsumer consumer = session.createConsumer(destination);
        while (true) 
            //消费者接收消息;
            Message message = (TextMessage) consumer.receive();
            if (message != null) 
                System.out.println("嘿嘿,我收到了-->" + message);
             else 
                break;
            
        
        //关闭资源;
        consumer.close();
        session.close();
        connection.close();

    

运行代码,消息已接收完成

注意到这个程序一直在运行监听状态,并没有停止;所以访问时看到的消费者值为1一直存在;

停止运行后,再次查看,消费者数量已显示为0;


现在重新将生产者代码运行一下;

在消费者接收消息时;这里使用6秒如果还没有收到消息就自动关闭;

去访问时发现消费者数量已经为0了;


异步监听方式的消费者


刚才那样的消费者接收消息为同步阻塞方式;

改为监听者模式

public class MyConsumer 
    //链接url
    public static final String ACTIVEMQ_URL = "tcp://192.168.59.128:61616";
    //目标队列名称;
    public static final String QUEUE_NAME = "myDeque";

    public static void main(String[] args) throws JMSException, IOException 
        //1.按照自己的链接,创建连接的工厂;
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        //2.通过工厂创建链接;-->开启;
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话; 参数:  事务, 签收机制;
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地;例如:队列
        Queue destination = session.createQueue(QUEUE_NAME);


        //创建消费者;
        MessageConsumer consumer = session.createConsumer(destination);
        //设定监听者;
        consumer.setMessageListener(new MessageListener() 
            @Override
            public void onMessage(Message message) 
                if(null != message && message instanceof TextMessage)
                    //进行强制类型转换;
                    TextMessage textMessage = (TextMessage)message;
                    try 
                        System.out.println("接收者来接收消息"+textMessage.getText());
                     catch (JMSException e) 
                        e.printStackTrace();
                    
                
            
        );
        //保持控制台不关闭;
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    

运行后;

若是将System.in.read();这行代码注释掉;


这里消费者刚启动就关闭了;

导致消息队列中的消息没有被接收处理;


关于3种常见的消费者问题


案例1: 先生产, 仅启动1号消费者, 是可以完成消息的消费.

先生产3条消息;

OK,这里有3条待消费处理的消息.

在运行一次生产者;现在队列中有6条消息;

然后此时启动消费者;

完成消费;

OK,完毕,清空队列


案例2: 先生产, 启动1号消费者之后, 然后启动2号消费者; 那么2号消费者 是无法进行消费的.

  • OK,这次先生产3条消息:

  • 让1号消费者先出动,成功消费;

  • 然后让2号消费者出动;无法消费;


案例3: 先启动2个消费者,然后在生成6个消息; 2个消费者平均分配消费到一半的消息.

启动消费者1号;

启动消费者2号;

目前已经显示两个消费者;

生产6条消息;

查看消费状况:1号消费者消费到 1,3,5的消息

2号消费者消费到 2,4,6的消息

此时可看到这个队列的状态.


队列案例总结


两种消费方式:

  • 同步阻塞方式 :receive() , 订阅者或者接受者调用MessageConsumer的receive() 方法接收消息,receive方法在接收到消息之前,将会一直阻塞.
  • 异步非阻塞方式: 采用监听器onMessage() , 订阅者或接收者可以通过MessageConsumer的setMessageListener(MessageListener listener) 注册消息监听器, 当这个消息到达时,当前系统就会自动调用监听器MessageListeneronMessage(Message message)方法

点对点消息传递域:

  • 每个消息只有一个消费者,1对1的关系.
  • 消息的生产者和消费者之间没有时间上的相关,无论消费者在生产者发送消息时是否处于运行状态,消费都可以提取消息.
  • 消息被消费了之后,队列中就不会再存储,也就是说: 消费者无法消费 已经被消费的消息.

2.Topic 主题


上面主要是队列(Queue)的入门学习部分:

现在来看看主题(Topic).


  • 生产者将消息发布到主题topic之后,可以由多个消费者前来消费, 一对多的关系;
  • 生产者与消费者具有时间相关性质,订阅某主题的消费者只能消费到订阅之后发布的消息.[ 比如说你加了一个群,但是这个群聊很久之前就有很多聊天记录,但你是看不到的.]
  • 生产者生产消息时,topic主题不会保存消息, 若此时无人订阅,那么生产出来的就是废弃消息, 所以说一般要先启动消费者再去启动生产者.

在JMS的规范中,可支持用户创建持久性的订阅, 允许消费者去消费没有处于激活状态时发送的消息.


完成一个基础的生产者-消费者-发布与订阅关系的案例

生产者[发布者]:

public class MyProduce 
    //链接url
    public static final String ACTIVEMQ_URL = "tcp://192.168.59.128:61616";
    //目标队列名称;
    public static final String TOPIC_NAME = "myTopicOne";

    public static void main(String[] args) throws JMSException 
        //1.按照自己的链接,创建连接的工厂;
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);

        //2.通过工厂创建链接;-->开启;
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话; 参数:  事务, 签收机制;
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地; 主题
        Topic destination = session.createTopic(TOPIC_NAME);
        //5.创建消息生产者;
        MessageProducer producer = session.createProducer(destination);
        //生产消息存入到MQ中间件的主题中;
        for (int i = 1; i <= 3; i++) 
            //表名是第几条消息;
            TextMessage textMessage = session.createTextMessage("+this is the" + i + "message");
            producer.send(textMessage);
        
        //关闭资源;
        producer.close();
        session.close();
        connection.close();

        System.out.println("+-+-+-+-+-+-+-+-+-+主题消息生成之后,发给中间大佬MQ了");
    

消费者[订阅者]:

public class MyConsumer 
    //链接url
    public static final String ACTIVEMQ_URL = "tcp://192.168.59.128:61616";
    //目标队列名称;
    public static final String Topic_NAME = "myTopic";

    public static void main(String[] args) throws JMSException, IOException 
        System.out.println("我是消费者->>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>");
        //1.按照自己的链接,创建连接的工厂;
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过工厂创建链接;-->开启;
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话; 参数:  事务, 签收机制;
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地;--->主题
        Topic destination = session.createTopic(Topic_NAME);
        //创建消费者;
        MessageConsumer consumer = session.createConsumer(destination);
        //设定监听者;
        consumer.setMessageListener((message)->
            if(null != message && message instanceof TextMessage)
                //进行强制类型转换;
                TextMessage textMessage = (TextMessage)message;
                try 
                    System.out.println("接收者来接收消息"+textMessage.getText());
                 catch (JMSException e) 
                    e.printStackTrace();
                
            
        );
        //保持控制台不关闭;
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    

  • 首先启动3个消费者;

  • 在linux中启动MQ,本地访问http://192.168.59.128:8161/admin/topics.jsp; 可看到目前有两个消费者