RabbitMQ交换机

Posted marklogzhu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ交换机相关的知识,希望对你有一定的参考价值。

一、交换机

1.1 作用

Exchange(交换机) 的作用就是接收消息并根据路由键转发消息到绑定的队列。

技术图片

1.2 交换机常用属性

属性 含义
Name 交换机名称
Type 交换机类型,direct、topic、fanout、headers等,它们本质都一样,只是消息转发的逻辑不同
Durability 是否持久化,true 为持久化
Auto Delete 当最后一个绑定到 Exchange 上的队列删除后,自动删除该 Exchange
Internal 当前 Exchange 是否用于 RabbitMQ 内部使用,默认为 false
Arguments 扩展参数,用于扩展 AMQP 协议自制定化使用

二、不同类型的交换机

2.1 Direct Exchange

**Direct Exchange (直连型交换机) ** 是根据消息携带的路由键(routing key)将消息投递给对应队列的,步骤如下:

1) 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key);
2) 当一个携带着路由值为 R 的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为 R 的队列。    

Direct 模式可以使用 RabbitMQ 自带的 Exchange 即 default Exchange ,所以不需要将 Exchange 进行任何绑定(binding) 操作,消息传递时,RouteKey 必须完全匹配才会被队列接受,否则消息会被丢弃。

我们之前的实例使用的就是 default Exchange

技术图片

实例:

消费者:

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1.创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机
        connectionFactory.setHost("111.231.83.100");
        // 设置端口
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");

        // 2.获取一个连接对象
        final Connection connection = connectionFactory.newConnection();

        // 3.创建 Channel
        final Channel channel = connection.createChannel();

        String exchangeType = "direct";

        // 4.声明交换机
        String exchangeName = "directExchange";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);

        // 5.申明队列
        String queueName = "directQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 6.将交换机和队列进行绑定关系
        // 注意:我们只绑定一个路由KEY,说明另一个路由不会被消费掉
        String routingKey = "directA";
        channel.queueBind(queueName, exchangeName, routingKey);


        // 7.创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);

        // 8.循环消费
        System.err.println("消费端启动");
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("消费端消费: " + msg);
        }
    }

}

生产者:

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1.创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机
        connectionFactory.setHost("111.231.83.100");
        // 设置端口
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");

        // 2.获取一个连接对象
        final Connection connection = connectionFactory.newConnection();

        // 3.创建 Channel
        final Channel channel = connection.createChannel();

        // 4.循环发送消息
        // 声明交换机
        String exchangeName = "directExchange";
        
        // 发送消息
        String routingKey1 = "directA";
        String msg1 = "directA 消息";
        channel.basicPublish(exchangeName, routingKey1, null, msg1.getBytes());

        String routingKey2 = "directB";
        String msg2 = "directB 消息";
        channel.basicPublish(exchangeName, routingKey2, null, msg2.getBytes());

        // 5.关闭资源
        channel.close();
        connection.close();
        connectionFactory.clone();
    }
}

先启动消费者,然后启动生产者,查看控制台输出:

消费端启动
消费端消费: directA 消息

可以看到未被绑定的 routingKey 消息未被消费掉。

2.2 Fanout Exchange

Fanout Exchange 是扇型交换机,它不处理路由键,只需要将队列绑定到交换机上。任何发送到 Fanout Exchange 的消息都会被转发到与该 Exchange 绑定(Binding)的所有 Queue上。它的转发消息是最快的。

技术图片

实例:

消费者

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1.创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机
        connectionFactory.setHost("111.231.83.100");
        // 设置端口
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");

        // 2.获取一个连接对象
        final Connection connection = connectionFactory.newConnection();

        // 3.创建 Channel
        final Channel channel = connection.createChannel();

        String exchangeType = "fanout";

        // 4.声明交换机
        String exchangeName = "fanoutExchange";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);

        // 5.申明队列
        String queueName1 = "fanoutQueue1";
        channel.queueDeclare(queueName1, false, false, false, null);

        String queueName2 = "fanoutQueue2";
        channel.queueDeclare(queueName2, false, false, false, null);


        // 6.将交换机和队列进行绑定关系,不需要设置 routingKey
        channel.queueBind(queueName1, exchangeName, "");
        channel.queueBind(queueName2, exchangeName, "");


        // 7.创建消费者并消费
        new Thread(() -> {
            try {
                handleConsumer(channel,queueName1);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                handleConsumer(channel,queueName2);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }


    private static void handleConsumer(Channel channel, String queueName) throws Exception {
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
        // 7.循环消费
        System.err.println(queueName + " 消费端启动");
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println(queueName + "消费端消费: " + msg);
        }
    }
}

生产者:

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1.创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机
        connectionFactory.setHost("111.231.83.100");
        // 设置端口
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");

        // 2.获取一个连接对象
        final Connection connection = connectionFactory.newConnection();

        // 3.创建 Channel
        final Channel channel = connection.createChannel();

        
        // 4.声明交换机
        String exchangeName = "fanoutExchange";
        // 5.循环发送消息
        for (int i = 0; i < 5; i++) {
            String msg = "消息"+i;
            channel.basicPublish(exchangeName, "", null, msg.getBytes());
        }
        // 6.关闭资源
        channel.close();
        connection.close();
        connectionFactory.clone();

    }
}

先启动消费者,然后启动生产者,查看控制台输出:

fanoutQueue1 消费端启动
fanoutQueue2 消费端启动
fanoutQueue1消费端消费: 消息0
fanoutQueue1消费端消费: 消息1
fanoutQueue2消费端消费: 消息0
fanoutQueue1消费端消费: 消息2
fanoutQueue2消费端消费: 消息1
fanoutQueue2消费端消费: 消息2
fanoutQueue1消费端消费: 消息3
fanoutQueue1消费端消费: 消息4
fanoutQueue2消费端消费: 消息3
fanoutQueue2消费端消费: 消息4

2.3 Topic Exchange

Topic Exchange(主题交换机) 会将所有发送到 Topic Exchange 的消息转发到所有关心 RouteKey 中指定的TopicQueue 上。

ExchangeRouteKey 和某 Topic 进行模糊匹配。使用这种类型,队列需要绑定一个 Topic。

注:可以使用通配符进行模糊匹配

  • # 匹配一个或多个词
  • * 匹配一个词

技术图片

实例:

消费者:

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1.创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机
        connectionFactory.setHost("111.231.83.100");
        // 设置端口
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");

        // 2.获取一个连接对象
        final Connection connection = connectionFactory.newConnection();

        // 3.创建 Channel
        final Channel channel = connection.createChannel();

        String exchangeType = "topic";

        // 4.声明交换机
        String exchangeName = "topicExchange";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);

        // 5.申明队列
        String queueName1 = "topicQueue1";
        channel.queueDeclare(queueName1, false, false, false, null);

        String queueName2 = "topicQueue2";
        channel.queueDeclare(queueName2, false, false, false, null);

        // 6.将交换机和队列进行绑定关系
        // 匹配一个或多个词
        String routingKey1 = "user.#";
        channel.queueBind(queueName1, exchangeName, routingKey1);
        // 匹配一个词
        String routingKey2 = "order.*";
        channel.queueBind(queueName2, exchangeName, routingKey2);

        // 7.创建消费者并消费
        new Thread(() -> {
            try {
                handleConsumer(channel,queueName1);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            try {
                handleConsumer(channel,queueName2);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }


    private static void handleConsumer(Channel channel, String queueName) throws Exception {
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);
        // 7.循环消费
        System.err.println(queueName + " 消费端启动");
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println(queueName + "消费端消费: " + msg);
        }
    }
    
}

生产者:

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1.创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机
        connectionFactory.setHost("111.231.83.100");
        // 设置端口
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");

        // 2.获取一个连接对象
        final Connection connection = connectionFactory.newConnection();

        // 3.创建 Channel
        final Channel channel = connection.createChannel();


        // 4.声明交换机
        String exchangeName = "topicExchange";
        // 5.发送消息
        // 模糊匹配多个词
        channel.basicPublish(exchangeName, "user.a", null, "user.a".getBytes());
        channel.basicPublish(exchangeName, "user.a.b", null, "user.a.b".getBytes());

        // 模糊匹配一个词
        channel.basicPublish(exchangeName, "order.a", null, "order.a".getBytes());
        channel.basicPublish(exchangeName, "order.a.b", null, "order.a.b".getBytes());

        // 6.关闭资源
        channel.close();
        connection.close();
        connectionFactory.clone();

    }
}

先启动消费者,然后启动生产者,查看控制台输出:

topicQueue2 消费端启动
topicQueue1 消费端启动
topicQueue1消费端消费: user.a
topicQueue1消费端消费: user.a.b
topicQueue2消费端消费: order.a

2.4 Headers Exchange

Headers Exchange(头交换机) 不处理路由键,而是根据发送的消息内容中的 headers 属性进行匹配。
在绑定 QueueExchange 时指定一组键值对;当消息发送到 RabbitMQ 时会取到该消息的 headersExchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 属性是一个键值对,可以是 Hashtable,键值对的值可以是任何类型。而 fanout,direct,topic 的路由键都需要要字符串形式的。

不经常使用,了解即可。

三、Dead Letter Exchange(死信交换机)

3.1 死信模式

死信模式指的是,当消费者不能处理接收到的消息时,将这个消息重新发布到另外一个队列中,等待重试或者人工干预,这个过程中的 exchangequeue 就是所谓的 Dead Letter ExchangeQueue

3.2 死信消息生成原因

消息变成死信有以下几种情况:

  • 消费者对消息使用了 **basicReject ** 或 者 basicNack 回复,并且 requeue 参数设置为 false,即不再将该消息重新在消费者间进行投递.
  • 消息由于消息有效期(per-message TTL)过期
  • 消息由于队列超过其长度限制而被丢弃

3.3 实例

死信消费者:

public class DeadLetterConsumer {

    public static void main(String[] args) throws Exception {
        // 1.创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机
        connectionFactory.setHost("111.231.83.100");
        // 设置端口
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");

        // 2.获取一个连接对象
        final Connection connection = connectionFactory.newConnection();

        // 3.创建 Channel
        final Channel channel = connection.createChannel();

        // 4.声明死信交换机
        String exchangeName = "deadLetterExchange";
        String exchangeType = "topic";
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);

        // 5.申明队列
        String queueName = "deadLetterQueue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 6.将交换机和队列进行绑定关系
        String routingKey = "#";
        channel.queueBind(queueName, exchangeName, routingKey);

        // 7.创建消费者消费
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);

        System.err.println("死信消费端启动");
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("死信消费端消费: " + msg);
        }

    }

}

业务消费者:

public class BusinessConsumer {

    public static void main(String[] args) throws Exception {
        // 1.创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机
        connectionFactory.setHost("111.231.83.100");
        // 设置端口
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");

        // 2.获取一个连接对象
        final Connection connection = connectionFactory.newConnection();

        // 3.创建 Channel
        final Channel channel = connection.createChannel();

        String exchangeType = "topic";

        // 4.声明交换机
        String exchangeName = "businessExchange";
        channel.exchangeDeclare(exchangeName, exchangeType, false, false, false, null);

        // 5.申明队列
        String queueName = "businessQueue";
        Map<String, Object> arguments = new HashMap();
        // 指定死信交换机名
        arguments.put("x-dead-letter-exchange", "deadLetterExchange");   
        // 如果队列配置了参数 x-dead-letter-routing-key 的话,“死信”的路由key将会被替换成该参数对应的值。
        // 如果没有设置,则保留该消息原有的路由key
//        arguments.put("x-dead-letter-routing-key", "business.#");
        channel.queueDeclare(queueName, true, false, false, arguments);

        // 5.将交换机和队列进行绑定关系
        String routingKey = "business.#";
        channel.queueBind(queueName, exchangeName, routingKey);

        // 6.创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        // autoAck = false 设置手动确认消费
        channel.basicConsume(queueName, false, queueingConsumer);

        // 7.循环消费
        System.err.println("业务消费端启动");
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.err.println("业务消费端接收消息: " + msg);
            // 设置消费失败
            channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
        }
    }

}

生产者:

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1.创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置主机
        connectionFactory.setHost("111.231.83.100");
        // 设置端口
        connectionFactory.setPort(5672);
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");

        // 2.获取一个连接对象
        final Connection connection = connectionFactory.newConnection();

        // 3.创建 Channel
        final Channel channel = connection.createChannel();

        // 4.循环发送消息
        // 声明交换机
        String exchangeName = "businessExchange";
        // 声明路由
        String routingKey = "business.order";
        for (int i = 0; i < 5; i++) {
            String msg = "business 消息" + i;
            channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        }


        // 5.关闭资源
        channel.close();
        connection.close();
        connectionFactory.clone();

    }
}
  1. 启动死信消费者
  2. 启动业务消费者
    3)启动生产者
  3. 观察控制台输出

业务消费者控制台输出:

业务消费端启动
业务消费端接收消息: business 消息0
业务消费端接收消息: business 消息1
业务消费端接收消息: business 消息2
业务消费端接收消息: business 消息3
业务消费端接收消息: business 消息4

死信消费者控制台输出:

死信消费端启动
死信消费端消费: business 消息0
死信消费端消费: business 消息1
死信消费端消费: business 消息2
死信消费端消费: business 消息3
死信消费端消费: business 消息4

可以看到处理失败的消息已经传递到死信交换机中,并被死信消费者消费。

从上面的代码,我们可以知道死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型 Direct、Fanout、Topic。一般来说,会为每个业务队列分配一个独有的 路由key ,并对应的配置一个死信队列进行监听。

3.4 死信的处理方式

死信的产生是不可避免,我们需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,

  1. 丢弃,如果不是很重要,可以选择丢弃
  2. 记录死信入库,然后做后续的业务分析或处理
  3. 通过死信队列,由负责监听死信的应用程序进行处理


以上是关于RabbitMQ交换机的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMq一文彻底弄懂RabbitMq的四种交换机原理及springboot实战应用

使用代码创建rabbitmq交换机和队列绑定

微服务专题之.Net6下集成消息队列-RabbitMQ交换机模式代码演示(全)

rabbitmq学习:rabbitmq(消息队列)的作用以及rabbitmq之直连交换机

RabbitMQ 消息队列学习

RabbitMQ----消费端限流TTL和使用代码生成交换机队列