RabbitMQ消息中间件9.通配符模式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ消息中间件9.通配符模式相关的知识,希望对你有一定的参考价值。


之前我们讲解了简单队列、work模式、订阅模式、路由模式,本篇我们讲解RabbitMQ的最后一种模式,叫“通配符模式”。



通配符交换机的通信机制如下所示:


【RabbitMQ消息中间件】9.通配符模式_queueBind


记得上一篇介绍路由模式的时候,发送者可能会发送一定key的消息,而消费者的队列绑定交换机时,会指定需要接受的信息类型的key,然后当消费者接收消息时只会在队列中获取到指定key类型的消息。



而“通配符交换机”与之前的路由模式相比,它将信息的传输类型的key更加细化,以“key1.key2.keyN....”的模式来指定信息传输的key的大类型和大类型下面的小类型,让消费者可以更加精细的确认自己想要获取的信息类型。而在消费者一段,不用精确的指定具体到哪一个大类型下的小类型的key,而是可以使用类似正则表达式(但与正则表达式规则完全不同)的通配符在指定一定范围或符合某一个字符串匹配规则的key,来获取想要的信息。



“通配符交换机”(Topic Exchange)将路由键和某模式进行匹配。此时队列需要绑定在一个模式上。符号“#”匹配一个或多个词,符号“*”仅匹配一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”只会匹配到“audit.irs”。(这里与我们一般的正则表达式的“*”和“#”刚好相反,这里我们需要注意一下。)



下面是一个解释通配符模式交换机工作的一个样例


【RabbitMQ消息中间件】9.通配符模式_System_02


上面的交换机制类似于一个国际新闻讯息网站的机制。


可以看到,队列绑定交换机时指定的key为“usa.#”的时候,可以匹配到以“uas.”开头的所有key的消息,所以其匹配到了key为“usa.news”和“usa.weather”,即获取美国的新闻和天气信息。



第二个队列绑定交换机时指定的key为“#.news”的时候,可以匹配到以“.news”结尾的所有key的消息,所以其匹配到了key为“usa.news”和“europe.news”,即获取美国的新闻和欧洲的新闻。



第三个队列绑定交换机时指定的key为“#.weather”的时候,可以匹配到以“.weather”结尾的所有key的消息,所以其匹配到了key为“usa.weather”和“europe.weather”,即获取美国的天气和欧洲的天气。



第四个队列绑定交换机时指定的key为“europe.#”的时候,可以匹配到以“europe。#”开头的所有key的消息,所以其匹配到了key为“europe.news”和“europe.weather”,即获取欧洲的新闻和欧洲的天气。



了解了“通配符交换机”的工作机制,下面我们来编写样例体会一下。


首先我们编写发送信息的生产者Send:


【RabbitMQ消息中间件】9.通配符模式_queueBind_03

package cn.jack.rabbitmq.topic;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send

private final static String EXCHANGE_NAME="test_exchange_topic";

public static void main(String[] args) throws Exception
//获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
//从连接中创建通道
Channel channel = connection.createChannel();

//声明Exchange,指定交换机的类型为topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic");

//消息内容 美国新闻
String message = "美国新闻:川普执行新的医疗政策";
channel.basicPublish(EXCHANGE_NAME, "usa.news",null, message.getBytes());
System.out.println("[product] Send "+ message +"");

//消息内容 美国天气
message = "美国天气:芝加哥今日天气阴转多云";
channel.basicPublish(EXCHANGE_NAME, "usa.weather",null, message.getBytes());
System.out.println("[product] Send "+ message +"");

//消息内容 中国新闻
message = "中国新闻:苹果发布最新产品iPhoneXXX";
channel.basicPublish(EXCHANGE_NAME, "china.news",null, message.getBytes());
System.out.println("[product] Send "+ message +"");

//消息内容 中国天气
message = "中国天气:上海今日天气晴转阵雨";
channel.basicPublish(EXCHANGE_NAME, "china.weather",null, message.getBytes());
System.out.println("[product] Send "+ message +"");

//关闭通道和连接
channel.close();
connection.close();

在该消费者中,消息类型分为中国和美国两个大类key,而每个大类下面又有一些小类别(新闻和天气)。



然后是消费者1(Recv1):


【RabbitMQ消息中间件】9.通配符模式_通配符模式_04


package cn.jack.rabbitmq.topic;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv1

private final static String QUEUE_NAME = "test_queue_topic_1";//队列名称

private final static String EXCHANGE_NAME="test_exchange_topic";//交换机名称

public static void main(String[] argv) throws Exception
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//绑定队列到交换机,并指定了一个通配符模式的key,来获取中国大类别下面的所有小类别消息。
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "china.#");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息
while (true)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [consumer1] Received " + message + "");
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);


在消费者1中,队列绑定交换机时,指定了获取消息的通配符key,用以获取所有符合“china.”开头的信息。



然后是消费者2(Recv2):


【RabbitMQ消息中间件】9.通配符模式_queueBind_05


package cn.jack.rabbitmq.topic;
import cn.jack.rabbitmq.connection.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2

private final static String QUEUE_NAME = "test_queue_topic_2";//队列名称

private final static String EXCHANGE_NAME="test_exchange_topic";//交换机名称

public static void main(String[] argv) throws Exception
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//绑定队列到交换机,并指定了一个通配符模式的key,来获取美国大类别下面的所有小类别消息。
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "usa.#");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);

// 获取消息
while (true)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [consumer2] Received " + message + "");
// 返回确认状态
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);


在消费者2中,队列绑定交换机时,指定了获取消息的通配符key,用以获取所有符合“uas.”开头的信息。



首先运行生产者创建交换机,然后运行两个消费者后,再次运行生产者发送消息。最终在控制台中得到的结果如下:


【RabbitMQ消息中间件】9.通配符模式_RabbitMQ_06


可以看到,不同的消费者定义了不同的通配符后,获取到的即是符合通配符的key的消息。



最后,通配符模式相对于路由模式的好处是,在路由模式中,如果我们要获取某几种类型key的信息,我们要一个一个的指定,而在通配符模式中,我们可以使用通配符,来更加简洁的指定想获取信息的key类型。

RabbitMQ:Topics主题/通配符模式

✨ RabbitMQ:Topics主题/通配符模式


📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件

1.基本介绍

  • Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符
  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
  • 通配符规则:
    • #:匹配0个或者多个词
    • *:刚好可以匹配一个词

2.生产者

public class Producer 
    public static String TOPIC_EXCHANGE = "topic_exchange";
    public static String TOPIC_QUEUE_1 = "topic_queue_1";
    public static String TOPIC_QUEUE_2 = "topic_queue_2";

    public static void main(String[] args) 
        try 
            Channel channel = ConnectUtil.getChannel();
            //声明交换机(交换机名称,交换机类型)
            channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);
            //声明队列
            channel.queueDeclare(TOPIC_QUEUE_1,true,false,false,null);
            channel.queueDeclare(TOPIC_QUEUE_2,true,false,false,null);
            //把交换机和队列1进行绑定
            channel.queueBind(TOPIC_QUEUE_1,TOPIC_EXCHANGE,"#.error");
            //把交换机和队列2进行绑定
            channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHANGE,"order.*");
            channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHANGE,"*.orange.*");
            channel.queueBind(TOPIC_QUEUE_2,TOPIC_EXCHANGE,"*.*");
            //发送消息
            String msg="日志信息:调用了xxx方法,日志级别是error";
             channel.basicPublish(TOPIC_EXCHANGE,"error",null,msg.getBytes());
            System.out.println("消息发送成功");
         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        


    





3.消费者

消费者1

public class Consumer1 
    public static void main(String[] args) 

        try 
            //获取信道对象
            Channel channel = ConnectUtil.getChannel();
            //消费消息
            DefaultConsumer consumer=new DefaultConsumer(channel)
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    System.out.println("消费者1接收到消息:"+new String(body,"UTF-8"));
                    System.out.println("消费者1把日志信息保存到数据库");
                
            ;
            channel.basicConsume(Producer.TOPIC_QUEUE_1,true,consumer);



         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        
    


消费者2

public class Consumer2 
    public static void main(String[] args) 

        try 
            //获取信道对象
            Channel channel = ConnectUtil.getChannel();
            //消费消息
            DefaultConsumer consumer=new DefaultConsumer(channel)
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                    System.out.println("消费者2接收到消息:"+new String(body,"UTF-8"));
                    System.out.println("消费者2把日志信息保存到数据库");
                
            ;
            channel.basicConsume(Producer.TOPIC_QUEUE_2,true,consumer);



         catch (IOException e) 
            e.printStackTrace();
         catch (TimeoutException e) 
            e.printStackTrace();
        
    


4.测试



以上是关于RabbitMQ消息中间件9.通配符模式的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ:Topics主题/通配符模式

RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码

RabbitMQ:第三章:Springboot集成RabbitMQ(直连模式,工作队列模式,发布订阅模式,路由模式,通配符模式)

万字长文图文详解Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,TTL)

RabbitMQ消息中间件

近九万字图文详解RabbitMQ