RabbitMQ入门教程:主题交换机Topics

Posted lonelyxmas

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ入门教程:主题交换机Topics相关的知识,希望对你有一定的参考价值。

原文:RabbitMQ入门教程(七):主题交换机Topics

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/vbirdbest/article/details/78631035
分享一个朋友的人工智能教程。比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看。

简介

本节主要演示交换机的另一种类型:主题类型topic,直连接类型direct必须是生产者发布消息指定的routingKey和消费者在队列绑定时指定的routingKey完全相等时才能匹配到队列上,与direct不同,topic可以进行模糊匹配,可以使用星号*和井号#这两个通配符来进行模糊匹配,其中星号可以代替一个单词;主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:“quick.orange.rabbit”,你可以定义任何数量的标识符,上限为255个字节。 #井号可以替代零个或更多的单词,只要能模糊匹配上就能将消息映射到队列中。当一个队列的绑定键为#的时候,这个队列将会无视消息的路由键,接收所有的消息

技术图片

生产者

public class Producer {
    @Test
    public void testBasicPublish() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setUsername("mengday");
        factory.setPassword("mengday");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // Routing 的路由规则使用直连接
        String EXCHANGE_NAME = "exchange.topic.x";
        String[] routingKeys = {"quick.orange.rabbit", "lazy.orange.elephant", "mq.erlang.rabbit", "lazy.brown.fox", "lazy."};
        for (String routingKey : routingKeys){
            String message = "Hello RabbitMQ - " + routingKey;

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
        }

        // 关闭资源
        channel.close();
        connection.close();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

消费者1

public class Consumer1 {
    @Test
    public void testBasicConsumer1() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setUsername("mengday");
        factory.setPassword("mengday");

        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        String EXCHANGE_NAME = "exchange.topic.x";
        String QUEUE_NAME = "queue.topic.q1";
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String[] routingKeys = {"*.orange.*"};
        for (int i = 0; i < routingKeys.length; i++) {
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
        }

        System.out.println("Consumer Wating Receive Message");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [C] Received ‘" + message + "‘, 处理业务中...");
            }
        };

        channel.basicConsume(QUEUE_NAME, true, consumer);

        Thread.sleep(1000000);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

消费者2

public class Consumer2 {
    @Test
    public void testBasicConsumer2() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(AMQP.PROTOCOL.PORT);
        factory.setUsername("mengday");
        factory.setPassword("mengday");

        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        String EXCHANGE_NAME = "exchange.topic.x";
        String QUEUE_NAME = "queue.topic.q2";
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String[] routingKeys = {"*.*.rabbit", "lazy.#"};
        for (int i = 0; i < routingKeys.length; i++) {
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKeys[i]);
        }

        System.out.println("Consumer Wating Receive Message");
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [C] Received ‘" + message + "‘, 处理业务中...");
            }
        };

        channel.basicConsume(QUEUE_NAME, true, consumer);

        Thread.sleep(1000000);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

运行效果

先启动消费者,再启动生产者。

技术图片

技术图片

技术图片

技术图片

有的消息可以满足多个模糊的路由键,具体会路由到哪个队列,关于同时满足条件以后再研究。

下面是一张topic的图,描述的比较准确
技术图片


预定义交换器

系统预定义了一个日志交换机,名称为“amq.rabbitmq.log”,类型为topic, 可以通过声明一个匿名队列,然后通过路由键(error warning info)等分别与队列进行绑定,从而来消费消息。 可以针对不同级别的消息进行处理,如error级别的可以发邮件给相关负责人。 消费者需要自己实现,往队列中发消息是RabbitMQ自己发送的,当有日志消息时RabbitMQ会自动发到amq.rabbitmq.log交换机中。


分享一个朋友的人工智能教程。比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看。

我的微信公众号:

技术图片


以上是关于RabbitMQ入门教程:主题交换机Topics的主要内容,如果未能解决你的问题,请参考以下文章

springBoot集成rabbitmq 之主题模式(topics)

RabbitMQ实战

RabbitMQ:Topics主题/通配符模式

RabbitMQ入门案例

RabbitMQ入门-理论

RabbitMQ生产方式和解决消息可靠性投递及其他问题