RabbitMQ 学习---- HelloWorld 简单模型

Posted RAIN 7

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 学习---- HelloWorld 简单模型相关的知识,希望对你有一定的参考价值。

文章目录

RabbitMQ 学习(二)---- HelloWorld 简单模型

开放 rabbitMQ 端口号 5672


之前我们使用rabbitMq 网页客户端 开放了 15672 的端口,要想是的 java客户端访问服务器成功,需要开放 5672 的端口号。在服务器安全组设置


还要再pom.xml中加载 ampq客户端的依赖

  <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.2</version>
        </dependency>

如果想要在控制台不日志的报错信息,还要加载 slf4j 的依赖

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>

发送者


(1)创建连接工厂 connectionFactory


在工厂中需要设置连接的主机名、端口号、客户端的用户名、密码、虚拟主机等,为之后的连接做好预先准备。


虚拟主机就是相当于 我们在数据库软件中 一个系统对应的数据库一样,对应这个一个单独的节点

 //1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("XXXX.XXX.XXX.XXX"); // 部署rabbitMQ的
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");
        // 虚拟主机相当于一个节点,作用相当于数据库软件中的 存储各种表数据的数据库
        // 发送消息接收消息 (channel、exchange、routinekey、queue)都在虚拟主机中完成
        connectionFactory.setVirtualHost("/test");

(2)创建 与 rabbitmq 的连接 connection


通过工厂实例拿到与rabbitMq的连接对象,只有拿到了连接,才能进行后续的所有操作

   //2、根据连接工厂创建连接对象
            connection = connectionFactory.newConnection();

(3)通过连接创建信道 channel


这里有一个经典面试题,为什么我们在channel中完成消息的发送接收,而不是直接在connection中呢?


看看这一个回答,能够很清楚的说明白。


(4)在信道中传递数据


如何在信道中传递数据呢?


我们需要明确一点,不管是什么模型,其实都需要 交换机(exchange)、路由器(routingkey)、队列(queue).


虽然这里的简单模型是点对点的,只需要 队列来传递数据的

  • Exchanger交换机,用来接收生产者发布的消息并将这些消息路由给服务器中的队列。

  • 默认的交换机有一个特点,只要你的routerKey与这个交换机中有同名的队列,他就会自动路由上。

这里的发送接收的规则捋了一下,是这样的,因为该模型不需要指定交换机与路由规则,只需要队列就行了所以使用默认交换机,交换机是用来接收生产者消息,并根据路由规则将消息分发给服务器中的队列中的,在此之前生产着声明了队列,但是在传递的时候仍然需要交换机与路由规则 给队列分发 消息, 巧了默认交换机有一个规则,如果路由规则与队列同名的话,那么路由与队列会自动绑定上,所以需要将 routingKey 写成 与 队列同名,让他们绑定上,然后生产者才能在队列中取到信息


一个很重要的信息传递的规则

  • 生产者声明队列

  • 生产者在发送消息的时候 使用 交换机 接收消息,通过 路由规则 分发消息到 指定的 队列 中等待接收

  • 消费者声明队列

  • 消费者在接收消息的时候通过队列(与生产者队列一致)进行接收消息,接收成功并选择是否回应


声明队列相关信息

 //4、在信道中声明队列
    /**
     * @Params1 queue:队列的名字
     * @Params2 durable: 是否支持队列持久化? 这里的持久化以及就是队列信息写入磁盘,如果rabbitmq服务器重启也会恢复队列信息,但不是信息持久化,信息会失去
     * @Params3 exclusive: 该队列是否支持独占? 就是这个队列在被一个信道占用的时候不能被其他进行访问 
     * @Params4 autoDelete: 该队列是否自动删除? 就是说这个队列中的信息被接收方拿完之后要自动删除
     * @Params5 arguments: map类型,一些额外的参数,比如说过期时间设置等
     */
            channel.queueDeclare("queue",false,false,false,null);

通过交换机、路由器、队列进行发送信息

        /**
         * 参数1:exchange 写交换机的名字,如果不写说明使用默认default amqp
         * 参数2:routinekey 路由器,如果没有路由器的话默认路由器和队列同名,需要写上队列的名字
         * 参数3:props 这里写发送的消息是否支持持久化
         * 参数4: bytes 类型,这是我们要传递的具体消息
         */ 

//5、使用该信道进行发送消息 
    for(int i=0;i<10;i++)
                channel.basicPublish("", "queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("简单模型信息传递:"+i).getBytes());
            

(6)关闭资源


在发送完毕之后,关闭信道,关闭连接


finally 
            if(channel!=null)
                try 
                    channel.close();
                 catch (IOException e) 
                    e.printStackTrace();
                 catch (TimeoutException e) 
                    e.printStackTrace();
                
            

            if(connection!=null)
                try 
                    connection.close();
                 catch (IOException e) 
                    e.printStackTrace();
                
            
        

完整的过程代码


package hello;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Provider 


    public static void main(String[] args) 
        //1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.46.143.156");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");

        // 虚拟主机相当于一个节点,作用相当于数据库软件中的 存储各种表数据的数据库
        // 发送消息接收消息 (channel、exchange、routinekey、queue)都在虚拟主机中完成
        connectionFactory.setVirtualHost("/test");

        Connection connection = null;
        Channel channel =null;

        try 
            //2、根据连接工厂创建连接对象
            connection = connectionFactory.newConnection();

            //3、根据连接对象创建信道
            channel = connection.createChannel();

            // 这里使用的是 helloworld 简单模型,不需要交换机,不需要路由,只需要队列

            //4、在信道中声明队列
            /**
             * @Params1 queue:队列的名字
             * @Params1 durable: 是否支持队列持久化? 这里的持久化以及就是队列信息写入磁盘,如果rabbitmq服务器重启也会恢复队列信息,但不是信息持久化,信息会失去
             * @Params1 exclusive: 该队列是否支持独占? 就是这个队列在被一个信道占用的时候不能被其他进行访问
             * @Params1 autoDelete: 该队列是否自动删除? 就是说这个队列中的信息被接收方拿完之后要自动删除
             * @Params1 arguments: map类型,一些额外的参数,比如说过期时间设置等
             */
            channel.queueDeclare("queue",false,false,false,null);

            //5、使用该信道使用exchange、routineKey 进行发送消息 Bytes,

            // 默认的交换机有一个特点,routineKey 如果和 队列名一致的话 ,那么匹配成功

            // 生产者不是将消息放在queue队列中,而是放在默认交换机中等待符合routingkey的队列匹配,routineKey 名字和队列名一致则匹配成功
            // queue在消费者这里生成,匹配成功之后消费者在queue中取消息

            /**
             * 参数1:exchange 写交换机的名字,如果不写说明使用默认default amqp
             * 参数2:routinekey 路由器,如果没有路由器的话默认路由器和队列同名,需要写上队列的名字
             * 参数3:props 这里写发送的消息是否支持持久化
             * 参数4: bytes 类型,这是我们要传递的具体消息
             */
            for(int i=0;i<10;i++)
                channel.basicPublish("", "queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("简单模型信息传递:"+i).getBytes());
            

         catch (Exception e)
            e.printStackTrace();
        finally 
            if(channel!=null)
                try 
                    channel.close();
                 catch (IOException e) 
                    e.printStackTrace();
                 catch (TimeoutException e) 
                    e.printStackTrace();
                
            

            if(connection!=null)
                try 
                    connection.close();
                 catch (IOException e) 
                    e.printStackTrace();
                
            
        

    





接收者


前面的几个步骤 都一样,

需要注意的一点是,接收方在接收queue中的数据的时候,声明queue必须和 发送方的保持一致,所有条件都得保持一致,否则接收不到。


(1)发送消息


从使用信道发送消息开始,使用 basicConsume()

 /**
             * 参数1 : 队列的名字
             * 参数2 : 是否自动确认,如果接收方接受了消息之后是否确认收到
             * 参数3 : 接收到消息之后的业务操作
             */

//5、使用该信道进行发送消息
            channel.basicConsume("queue", true, new DefaultConsumer(channel)
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    System.out.println("接受了发送方的消息:"+ new String(body));
                
            );


这里需要说一下,在最后的业务操作的参数是一个 Consumer,设置一个接口,我们需要写一个实现类,重写其中的 方法,对接收的message进行后续的业务操作。


接受方完整代码

package hello;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Customer 
    public static void main(String[] args) 
        //1、创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.46.143.156");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");

        // 虚拟主机相当于一个节点,作用相当于数据库软件中的 存储各种表数据的数据库
        // 发送消息接收消息 (channel、exchange、routinekey、queue)都在虚拟主机中完成
        connectionFactory.setVirtualHost("/test");

        Connection connection = null;
        Channel channel =null;

        try 
            //2、根据连接工厂创建连接对象
            connection = connectionFactory.newConnection();
            //3、根据连接对象创建信道
            channel = connection.createChannel();
            // 这里使用的是 helloworld 简单模型,不需要交换机,不需要路由,只需要队列
            //4、在信道中声明队列
            channel.queueDeclare("queue",false,false,false,null);
            //5、使用该信道进行发送消息
            /**
             * 参数1 : 队列的名字
             * 参数2 : 是否自动确认,如果接收方接受了消息之后是否确认收到
             * 参数3 : 接收到消息之后的业务操作
             */
            channel.basicConsume("queue", true, new DefaultConsumer(channel)
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    System.out.println("接受了发送方的消息:"+ new String(body));
                
            );
         catch (Exception e)
            e.printStackTrace();
        
    



以上是关于RabbitMQ 学习---- HelloWorld 简单模型的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习--RabbitMQ集群相关学习

rabbitmq学习:rabbitmq(消息队列)的作用以及rabbitmq之直连交换机

RabbitMQ学习和使用

RabbitMQ学习系列: RabbitMQ安装与配置

RabbitMQ学习系列: RabbitMQ安装与配置

RabbitMQ学习RabbitMQ六大核心部分学习