Java实现简单的ActiveMQ通讯

Posted 不想努力的小龙

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java实现简单的ActiveMQ通讯相关的知识,希望对你有一定的参考价值。

Java实现简单的ActiveMQ通讯

  1. 我对学习的理解:先理解,再记忆(在脑海中回忆4遍记忆,或者是默写4遍)

  2. 说出你遇到的问题。以及不懂的地方,欢迎大家评论

  • 你会学到:

    • 如何用java实现一个简单的生产者生产消息
    • 如何用java实现一个简单的消费者消费消息
    • 消费者消费消息的二种方式(同步阻塞、异步非阻塞)
    • 消费者和生产者谁先启动会造成什么影响?
  • 前提条件:你必须在centos安装了activeMQ,并且能够得到web界面,如果你没有安装,请看这篇
    https://www.cnblogs.com/dragonyoung/p/15834624.html

  • 需要的maven依赖

<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
<dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>4.15</version>
</dependency>
  • ❤❤❤请记住下面这张图片(下面的代码都是基于这张图片的)

准备好了,那么我们开始吧!

1. 生产者生产消息

生成生产流程(依据上面这张图):

  1. 创建给定ActiveMQ服务连接工厂
  2. 由连接工厂生成出connection连接对象
  3. 由连接对象创建出session会话对象
  4. 创建目的地(队列或者主题)
  5. 创建消息生产者(需要与目的地建立连接)
  6. 发送消息(创建消息、发送消息)
  7. 释放资源
public class JSMProducor 
    // ActiveMQ服务地址 (它采用的是tcp协议)
    public static final String ACTIVEMQ_URL = "tcp://182.61.35.6:61617";
    // 消息队列名称
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException 
        // 1. 创建工厂对象 (这个工厂对象连接到哪里?ACTIVEMQ_URL)
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2. 创建连接对象, 并启动访问
        Connection connection = factory.createConnection();
        connection.start();
        // 3. 通过连接对象创建session对象,第一个参数为是否开启事务,第二个参数为签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4.创建目的地(队列或者主题)
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5. 创建消息的生产者
        MessageProducer producer = session.createProducer(queue);
        // 6. 发送消息
        for (int i = 0; i < 3; i++) 
            // 6.1 创建消息
            TextMessage textMessage = session.createTextMessage("msg信息:" + i + ":hello world");
            // 6.2 将消息发送到MQ
            producer.send(textMessage);
        
        // 7. 关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("success.....");
    

这里就可以显示消息发送成功

2. 消费者消费消息

消费者消费流程(依据上面那张图):

  1. 创建工厂对象(思考:这个工厂连接着哪里?)
  2. 根据工厂生产出连接对象,并建立连接
  3. 根据连接对象创建session会话对象
  4. 创建目的地(需要与消费者建立通道)
  5. 创建消费者(与目的地建立连接)
  6. 消费消息
  7. 释放资源
class JSMConsumer
    // ActiveMQ服务地址 (它采用的是tcp协议)
    public static final String ACTIVEMQ_URL = "tcp://182.61.35.6:61617";
    // 消息队列名称
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException, IOException 
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer(queue);
//        while(true)
//            TextMessage receive = (TextMessage)consumer.receive();
//            TextMessage receive1 = (TextMessage)consumer.receive(3000L);
//            if(receive != null) 
//                System.out.println("消息内容是:" + receive.getText());
//            else 
//                break;
//            
//        
//        // 关闭资源
//        session.close();
//        consumer.close();
//        connection.close();
        // 方式二:设置监听器
        consumer.setMessageListener(new MessageListener() 
            public void onMessage(Message message) 
                if (message != null && message instanceof TextMessage) 
                    TextMessage textMessage = (TextMessage) message;
                    try 
                        System.out.println("监听器接受到的消息:" + textMessage.getText());
                     catch (JMSException e) 
                        e.printStackTrace();
                    
                
            
        );
        // 说明为什么要使用 ystem.in.read();
        // 因为设置监听器是异步非阻塞,线程不需要等它接收消息就会往下执行
        // 所以这里设置了等待,防止他直接释放资源,接收线程了
        System.in.read();
        // 关闭资源
        consumer.close();
        session.close();
        connection.close();

    

下面就说明了消费者消费成功了

3. 思考一个问题?

生产者先启动:

​ 出现一个消费者?怎么办?直接消费。

​ 出现两个消费者? 怎么办?先启动的消费者消费。

2个消费者先启动:

​ 生成者生成的消息均摊到2个消费者上面去。

4. 总结 + 几个知识点和注意

  • 请记住上面的那张图,那么敲这些代码就简单了

  • 解释一下下面的几个点

  • 解释一些消费者的receive方法

  • 消费者消费的两种方式

    • 同步阻塞:通过receive()方法消费
    • 同步非阻塞:通过设置监听器(这种方式是异步非阻塞的)来消费消息
  • 生产者和消费者的先后启动问题

以上是关于Java实现简单的ActiveMQ通讯的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ简单简绍(“点对点通讯”和 “发布订阅模式”)

ActiveMQ 即时通讯服务 浅析

ActiveMQ 即时通讯服务 浅析

ActiveMQ怎么实现两台服务器之间的通讯。比如A发消息给B,B能收到。具体的代码和配置。

消息队列 ActiveMQ的简单了解以及点对点与发布订阅的方法实现ActiveMQ

ActiveMQ入门