RabbitMQ案列

Posted iLisa

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ案列相关的知识,希望对你有一定的参考价值。

一:RabbitMQ相当于一个信箱

我们只需要将收件人写下,放入到信箱中就不需要管了。自动送过去。

生产者:什么都不做,就是发消息

消息传递:消息队列:信箱的名字就叫做队列,队列接近无限大的缓冲,取决于硬盘有多大它就有多大。这个队列专门用来存储接收转发消息。

消费者:大部分在等待和接收消息的

 

 ConnectionFactory factory=new ConnectionFactory();连接工厂

factory.setHost(“localhost”);连接RabbitMQ服务器()

factory.setVirtualHost(“/shop”);

factory.setUsername(“shop”);

factory.setPassword(“shop”);

Connection connection=factory.newConnection();通过连接工厂创建连接

Channel  channel=connection.createChannel();通过连接创建信道

channel.queueDeclare(队列名,false,false,false,null);绑定队列

二:广播类型案列(广播类型的RabbitMQ)

P:发送端,X交换机,红的是队列,C是接收方

1.RabbitMQ客户端依赖

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>

 2.Send类发送端

从图中看到:
消息产生后不是直接投送到队列中,而是将消息先投送给Exchange交换机,然后消息经过Exchange
交换机投递到相关队列
多个消费者消费的不再是同一个队列,而是每个消费者消费属于自己的队列。
/**
* 发布与订阅模式队列-fanout广播模式-消息发送者
*/
public class Send {
   // 队列名称
   // 如果不声明队列,会使用默认值,RabbitMQ会创建一个排他队列,连接断开后自动删除
   //private final static String QUEUE_NAME = "ps_fanout";
   // 交换机名称
   private static final String EXCHANGE_NAME = "exchange_fanout";7.2. Receiving
这里对于消费者,消费消息时,消息通过交换机Exchange被路由到指定队列,绑定队列到指定交换机来
实现,一个消费者接到消息后用于邮件发送模拟,另一消费者收到消息,用于短信发送模拟。
Recv01.java
   public static void main(String[] args) {
       // 创建连接工厂
       ConnectionFactory factory = new ConnectionFactory();
       factory.setHost("127.0.0.1");
       factory.setPort(5672);
       factory.setUsername("shop");
       factory.setPassword("shop");
       factory.setVirtualHost("/shop");
       Connection connection = null;
       Channel channel = null;
       try {
           // 通过工厂创建连接
           connection = factory.newConnection();
           // 获取通道
           channel = connection.createChannel();
           // 声明队列
           //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
           // 绑定交换机 fanout:广播模式
           channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
           // 创建消息,模拟发送手机号码和邮件地址
           String message = "18600002222|12345@qq.com";
           // 将产生的消息发送至交换机
           channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
           System.out.println(" [x] Sent \'" + message + "\'");
      } catch (IOException e) {
           e.printStackTrace();
      } catch (TimeoutException e) {
           e.printStackTrace();
      } finally {
           try {
               // 关闭通道
               if (null != channel && channel.isOpen())
                   channel.close();
               // 关闭连接
               if (null != connection && connection.isOpen())
                   connection.close();
          } catch (TimeoutException e) {
               e.printStackTrace();
          } catch (IOException e) {
               e.printStackTrace();
          }
      }

生产者:之前绑定的是队列,现在绑定的是交换机。然后直接发送给交换机

3.Recv消费端

   这里对于消费者,消费消息时,消息通过交换机Exchange被路由到指定队列,绑定队列到指定交换机来
实现,一个消费者接到消息后用于邮件发送模拟,另一消费者收到消息,用于短信发送模拟。
/**
* 发布与订阅模式队列-fanout广播模式-消息接收者
*/
public class Recv01 {
    // 交换机名称
    private static final String EXCHANGE_NAME = "exchange_fanout";
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("shop");
        factory.setPassword("shop");
        factory.setVirtualHost("/shop");
        try {
            // 通过工厂创建连接
            final Connection connection = factory.newConnection();
            // 获取通道
            final Channel channel = connection.createChannel();
            // 绑定交换机 fanout:广播模式
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            // 获取队列名称
            String queueName = channel.queueDeclare().getQueue();
            // 绑定队列
            channel.queueBind(queueName, EXCHANGE_NAME, "");
            /*
                限制RabbitMQ只发不超过1条的消息给同一个消费者。
                当消息处理完毕后,有了反馈,才会进行第二次发送。
             */
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            // 获取消息,按|分割以后一个消费者发短信,一个消费者发邮件
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received01 \'" + message + "\'");
                // 手动回执消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
           };
            // 监听队列
            /*
                autoAck = true代表自动确认消息
                autoAck = false代表手动确认消息
             */
            boolean autoAck = false;
            channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {
           });
       } catch (IOException e) {
            e.printStackTrace();
       } catch (TimeoutException e) {
            e.printStackTrace();
       }
   }
}

 将交换机和排他队列进行了绑定,最后绑定的是队列,和发送的区别:

 总结
从结果可以看出生产者发送了一条消息,用于邮件发送和短信发送的消费者均可以收到消息进行后续
处理。
问题:生产者产生的消息所有消费者都可以消费,可不可以指定某些消费者消费呢?
 解决:采用direct路由模式。

 

以上是关于RabbitMQ案列的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ核心功能介绍

rabbitmq - 不会获取队列中的所有消息

JDBC 代码简写版案列

JS监听checkbox的选择获取取消事件代码案列

怎么用js来实现页面的分页,有案列代码吗?请给个代码看看,谢谢

php知识案列1