RabbitMQ学习(中)——交换机死信队列和延迟队列

Posted AC_Jobim

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习(中)——交换机死信队列和延迟队列相关的知识,希望对你有一定的参考价值。

一、交换机

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

发布消息方法:

第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话


临时队列:

每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。

  • 创建临时队列的方式如下:
    String queueName = channel.queueDeclare().getQueue();
    

创建出来之后长成这样:


绑定 binding:

binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。

1.1 Fanout exchange(发布/订阅模式)

Fanout exchange又叫发布订阅模式。扇出交换机将消息路由到与其绑定的所有队列,并且路由键将被忽略。如果将N个队列绑定到扇出交换,则将新消息发布到该交换时,会将消息的副本传递到所有N个队列。扇出交换机非常适合消息的广播路由
注意:fanout类型的exchange会把消息推到所有的queue中,所以不需要指定routingkey,指定了也没用

系统中默认有fanout类型的exchange

实现效果:EmitLog(生产者)发送消息给两个消费者接收并打印接收到的信息

代码示例:

EmitLog 发送消息给两个消费者接收:

public class EmitLog {

    //交换机的名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        //声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生成这发出消息:" + message);
        }
    }
}

ReceiveLogs02将接收到的消息打印

public class ReceiveLogs01 {

    //交换机的名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 生成一个临时队列,队列的名称是随机的
         * 当消费者断开与队列的连接的时候,队列自动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        /**
         * 绑定交换机与队列,其中routingkey(也称之为 binding key)为空字符串,广播模式下路由键将被忽略
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("ReceiveLogs01等待接收消息,把接收到的消息打印在屏幕上.....");

        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogs02控制台打印接收到的消息:" + new String(message.getBody()));
        };

        channel.basicConsume(queueName,true,deliverCallback,(consumerTag) -> {});
    }
}

ReceiveLogs02将接收到的消息打印在控制台

public class ReceiveLogs02 {

    //交换机的名称
    public static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        /**
         * 生成一个临时队列,队列的名称是随机的
         * 当消费者断开与队列的连接的时候,队列自动删除
         */
        String queueName = channel.queueDeclare().getQueue();
        /**
         * 绑定交换机与队列,其中routingkey(也称之为 binding key)为空字符串,广播模式下路由键将被忽略
         */
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("ReceiveLogs02等待接收消息,把接收到的消息打印在屏幕上.....");

        //接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("ReceiveLogs02控制台打印接收到的消息:" + new String(message.getBody()));
        };

        channel.basicConsume(queueName,true,deliverCallback,(consumerTag) -> {});
    }
}

效果展示:

1.2 Direct exchange(路由模式)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routingkey完全一致,才会接收到消息


在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.

在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。


多重绑定:


当然如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。


实战实现效果:


c2:绑定disk,routingKey为error
c1:绑定console,routingKey为info、warning

  1. 生产者:

    public class DirectLogs {
        //交换机的名称
        public static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            //创建多个 bindingKey
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("info", "普通 info 信息");
            bindingKeyMap.put("warning", "警告 warning 信息");
            bindingKeyMap.put("error", "错误 error 信息");
            //debug 没有消费这接收这个消息 所有就丢失了
            bindingKeyMap.put("debug", "调试 debug 信息");
    
            for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
                //获取 key value
                String bindingKey = bindingKeyEntry.getKey();
                String message = bindingKeyEntry.getValue();
    
                channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes("UTF-8"));
                System.out.println("生产者发出消息:" + message);
            }
        }
    }
    
  2. 消费者C1:

    public class ReceiveLogsDirect01 {
    
        public static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            channel.queueDeclare("console",false,false,false,null);
    
            //把该临时队列绑定名为EXCHANGE_NAME的交换机, 其中 routingkey(也称之为 binding key)为info
            channel.queueBind("console",EXCHANGE_NAME,"info");
            channel.queueBind("console",EXCHANGE_NAME,"warning");
            System.out.println("ReceiveLogsDirect01等待接收消息,把接收到的消息打印在屏幕上.....");
    
            //接收消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
                System.out.println("info和warning 消息已经接收:\\n" + message);
            };
    
            channel.basicConsume("console",true,deliverCallback,(consumerTag) -> {});
    
        }
    }
    
  3. 消费者C2:

    public class ReceiveLogsDirect02 {
    
        public static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明一个交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            channel.queueDeclare("disk",false,false,false,null);
    
            //绑定交换机与队列
            channel.queueBind("disk",EXCHANGE_NAME,"error");
            System.out.println("ReceiveLogsDirect02等待接收消息,把接收到的消息打印在屏幕上.....");
    
            //接收消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message;
                System.out.println("error 消息已经接收:\\n" + message);
            };
    
            channel.basicConsume("disk",true,deliverCallback,(consumerTag) -> {});
    
        }
    }
    
  4. 执行结果:

1.3 Topics 模式

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert


Topic的要求:

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。

在这个规则列表中,其中有两个替换符是大家需要注意的:

  • *(星号)可以代替一个单词
  • #(井号)可以替代零个或多个单词

Topic匹配案例:

下图绑定关系如下:

  • Q1–>绑定的是
    • 中间带 orange 带 3 个单词的字符串 (*.orange.*)
  • Q2–>绑定的是
    • 最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)
    • 第一个单词是 lazy 的多个单词 (lazy.#)

上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的

例子说明
quick.orange.rabbit被队列 Q1Q2 接收到
azy.orange.elephant被队列 Q1Q2 接收到
quick.orange.fox被队列 Q1 接收到
lazy.brown.fox被队列 Q2 接收到
lazy.pink.rabbit虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit是四个单词但匹配 Q2

注意:

  • 当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout
  • 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct

实战实现效果:实现上面的匹配案例

  1. 生产者

    public class EmitLogTopic {
        public static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            /**
             * Q1-->绑定的是
             * 中间带 orange 带 3 个单词的字符串(*.orange.*)
             * Q2-->绑定的是
             * 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
             * 第一个单词是 lazy 的多个单词(lazy.#)
             */
            Map<String, String> bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
            bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
            bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
            bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
            bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
            bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
            bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
            bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
    
            for (Map.Entry<String, String> stringEntry : bindingKeyMap.entrySet()) {
                String routingKey = stringEntry.getKey();
                String message = stringEntry.getValue();
                channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(StandardCharsets.UTF_8));
                System.out.println("生产者发送消息:" + message);
            }
        }
    }
    
  2. 消费者C1

    /**
     * 消费者C1,接收中间带 orange 带 3 个单词的字符串 (*.orange.*)
     */
    public class ReceiveLogsTopic01 {
        //交换机的名称
        public static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //声明队列
            String queueName = "Q1";
            channel.queueDeclare(queueName,false,false,false,null);
            //绑定交换机和队列
            channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
            System.out.println("Q1等待接收消息,把接收到的消息打印在屏幕上.....");
    
            //接收消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println("Q1控制台打印接收到的消息:" + new String(message.getBody()));
                System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey());
    
            };
            channel.basicConsume(queueName,true,deliverCallback,(consumerTag) ->{});
        }
    }
    
  3. 消费者C2

    /**
     * 消费者C2,接收最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)和第一个单词是 lazy 的多个单词 (lazy.#)
     */
    public class ReceiveLogsTopic02 {
        //交换机的名称
        public static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //声明队列
            String queueName = "Q2";
            channel.queueDeclare(queueName浅谈RabbitMQ——死信队列与延迟队列

    浅谈RabbitMQ——死信队列与延迟队列

    RabbitMQ--死信队列/延迟队列--使用/原理

    RabbitMQ的动态创建交换机、队列、绑定、死信队列,延迟队列代码实现

    RabbitMQ-消息可靠性&延迟消息

    RabbitMQ之消息可靠性死信交换机惰性队列及集群