笔记15:RabbitMQ工作模式案例及可靠性队列

Posted Lossdate

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了笔记15:RabbitMQ工作模式案例及可靠性队列相关的知识,希望对你有一定的参考价值。

一、写在开头

  1. RabbitMQ交换器有:direct 、 topic 、 headers 和 fanout 四种类型

  2. POM:

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.9.0</version>
        </dependency>
    </dependencies>
    

二、使用默认交换器

使用默认交换器,可以不绑定交换器

  1. Producer
    public class Producer 
        public static void main(String[] args) throws Exception 
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("192.168.200.136");
            factory.setVirtualHost("/");
            factory.setPort(5672);
            factory.setUsername("root");
            factory.setPassword("123456");
    
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare(
                    //队列名
                    "queue.default.ex",
                    //是否持久化
                    false,
                    //是否只能自己的连接才能使用
                    false,
                    //是否自动删除
                    false,
                    //有没有属性
                    null);
    
            //在发送消息的时候没有指定交换器的名字,此时使用的是默认的交换器,默认交换器就没有名字
            //路由键就是目的地消息队列的名字
            channel.basicPublish(
                    //exchange名字
                    "",
                    //路由键
                    "queue.default.ex",
                    null,
                    ("hello default").getBytes(StandardCharsets.UTF_8));
            
            channel.close();
            connection.close();
        
    
    
  2. Consumer
    public class Consumer 
        public static void main(String[] args) throws Exception 
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f");
    
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            GetResponse getResponse = channel.basicGet("queue.default.ex", true);
            System.out.println("收到的消息:" + new String(getResponse.getBody()));
    
            channel.close();
            connection.close();
        
    
    
  3. 验证
    启动producer,生产消息,之后启动consumer

三、Work Queue模式

生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果
交换器:direct

  1. Producer
    public class Producer 
        public static void main(String[] args) throws Exception 
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f");
    
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            channel.queueDeclare("queue.wq", true, false, false, null);
            
            //声明DIRECT类型的交换器
            channel.exchangeDeclare("ex.wq", BuiltinExchangeType.DIRECT, true, false, null);
    
            channel.queueBind("queue.wq", "ex.wq", "key.wq");
    
            for (int i = 0; i < 15; i++) 
                channel.basicPublish("ex.wq", "key.wq", null, ("工作队列: " + i).getBytes("UTF-8"));
            
            
            channel.close();
            connection.close();
        
    
    
  2. Consumer
    public class Consumer 
        public static void main(String[] args) throws Exception 
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f");
    
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            
            //确保MQ中有该队列,如果没有则创建
            channel.queueDeclare("queue.wq", true, false, false, null);
    
            channel.basicConsume("queue.wq", new DeliverCallback() 
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException 
                    System.out.println("推送来的消息:" + new String(delivery.getBody(), "UTF-8"));
                
            , new CancelCallback() 
                @Override
                public void handle(String consumerTag) throws IOException 
                    System.out.println("Cancel: " + consumerTag);
                
            );
        
    
    
  3. 测试
    启动三个Consumer,接着启动Producer,可以看到三个Consumer平均的收到总共15个消息

四、发布订阅模式

使用fanout类型交换器,routingKey忽略
每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息
消息广播给所有订阅该消息的消费者
交换器:fanout

  1. Producer
    public class Producer 
        public static void main(String[] args) throws Exception 
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f");
    
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            //声明fanout类型的交换器
            channel.exchangeDeclare("ex.fan", BuiltinExchangeType.FANOUT, true, false, null);
    
            for (int i = 0; i < 20; i++) 
                channel.basicPublish("ex.fan",
                        //fanout类型的交换器不需要指定路由键
                        "",
                        null,
                        ("hello fan: " + i).getBytes(StandardCharsets.UTF_8));
            
            
            channel.close();
            connection.close();
        
    
    
  2. Consumer
    创建三个Consumer(OneConsumer, TwoConsumer, ThreeConsumer)
    public class OneConsumer 
        public static void main(String[] args) throws Exception 
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f");
    
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            //生成临时队列,队列名字由RabbitMQ自动生成
            final String queueName = channel.queueDeclare().getQueue();
            System.out.println("OneConsumer生成的临时队列名字:" + queueName);
            //声明一个fanout交换器
            channel.exchangeDeclare("ex.myfan", BuiltinExchangeType.FANOUT, true, false, null);
            //绑定,fanout类型的交换器绑定不需要routingKey
            channel.queueBind(queueName, "ex.fan", "");
    
            channel.basicConsume(queueName, (consumerTag, delivery) ->
                    System.out.println("OneConsumer:" + new String(delivery.getBody(), StandardCharsets.UTF_8)
                    ), (consumerTag) ->
                    System.out.println("OneConsumer Cancel: " + consumerTag));
        
    
    
    TwoConsumer, ThreeConsumer和OneConsumer内容一样,修改下System.out.println内容
  3. 测试
    分别启动OneConsumer、TwoConsumer、ThreeConsumer,然后启动Producer,
    可以看到三个Consumer都收到了20个消息

五、路由模式

让接收者只接收部分消息
例:通过直接模式的交换器将关键的错误信息记录到不同的log文件,同时在控制台正常打印所有的日志信息,这里模拟ERROR,FALTAL和WARN
交换器:direct

  1. Producer
    public class Producer 
    
        private static final String[] LOG_LEVEL = 
            "ERROR", "FATAL","WARN"
        ;
    
        private static Random RANDOM= new Random();
    
        public static void main(String[] args) throws Exception 
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f");
    
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            //direct交换器
            channel.exchangeDeclare("ex.routing", "direct", false, false, null);
    
            for (int i = 0; i < 100; i++) 
                String level = LOG_LEVEL[RANDOM.nextInt(100) % LOG_LEVEL.length];
                channel.basicPublish("ex.routing", level, null, ("来自["+level+"]的消息").getBytes());
            
            
            channel.close();
            connection.close();
        
    
    
  2. Consumer
    三个consumer(ErrorConsumer、FatalConsumer、WarnConsumer)
    public class ErrorConsumer 
        public static void main(String[] args) throws Exception 
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f");
    
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            channel.exchangeDeclare("ex.routing", "direct", false, false, null);
            //临时消息队列
            channel.queueDeclare("queue.error", false, false, false, null);
            //绑定消息队列和交换器
            channel.queueBind("queue.error", "ex.routing", "ERROR");
    
            channel.basicConsume("queue.error", ((consumerTag, delivery) ->
                    System.out.println("ErrorConsumer收到的消息:" + new String(delivery.getBody(), StandardCharsets.UTF_8))
                    ),(consumerTag) -> );
    
        
    
    
    其它两个consumer分别修改对应的队列名(queue.fatal, queue.warn)和System.out.println的输出
  3. 测试
    分启动三个Consumer,之后启动Procucer, 可以看到不同的消息分别路由到了特定的Consumer

六、主题模式

使用 topic 类型的交换器,队列绑定到交换器、 bindingKey 时使用通配符
交换器将消息路由转发到具体队列时会根据消息routingKey 模糊匹配,比较灵活

要想 topic 类型的交换器, routingKey就不能随便写了,它必须得是点分单词
单词可以随便写,生产中一般使用消息的特征。如:“system.group.key”,该点分单词字符串最长255字节

bindingKey 也必须是这种形式
topic 类型的交换器背后原理跟 direct 类型的类似:只要队列 的 bindingKey 的值与消息的 routingKey 匹配,队列就可以收到该消息
有两个不同:
" * "星号表示匹配一个单词
" # "井号表示匹配0到多个单词

如果在 topic 类型的交换器中 bindingKey 使用 # ,则就是 fanout 类型交换器的行为
如果在 topic类型的交换器中 bindingKey 中不使用 * 和 # ,则就是 direct 类型交换器的行为

交换器:topic

  1. Producer
    routingKey: level.area.work
    public class Producer 
    
        private static final String[] LOG_LEVEL = "INFO", "ERROR", "WARN";
        private static final String[] LOG_AREA = "Shanghai", "Beijing", "HongKong";
        private static final String[] LOG_WORK = "programmer", "engineer", "artist";
    
        private static final Random RANDOM = new Random();
    
        public static void main(String[] args) throws Exception 
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f");
    
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    
            //direct交换器
            channel.exchangeDeclare("ex.topic", "topic", true, false, null);
    
            String level, area, work;
            String routingKey, msg;
            for (int i = 0; i < 100; i++) 
                level = LOG_LEVEL[RANDOM.nextInt(100) % LOG_LEVEL.length];
                area = LOG_AREA[RANDOM.nextInt(100) % LOG_AREA.length];
                work = LOG_WORK[RANDOM.nextInt(100) % LOG_WORK.length];
    
                //用‘.’分隔维度
                routingKey = level + "." + area + "." + work;
                msg = "LOG"+i+" ["+level+"] -> AREA:"+area+" -> WORK:"+work;
    
                channel.basicPublish("ex.topic", routingKey, null, msg.getBytes());
            
            
            channel.close();
            connection.close();
        
    
    
  2. Consumer
    分别创建四个consumer
    1)ShanghaiConsumer :只接收来自上海的消息
    2)BeijingErrorConsumer :只收来自北京的ERROR的消息
    3)HongKongProgrammerConsumer:只收来自香港的程序员的消息
    4)ErrorConsumer:只收ERROR的消息
    public class ShanghaiConsumer 
        public static void main(String[] args) throws Exception 
            ConnectionFactory factory = new ConnectionFactory();
            factory.setUri("amqp://root:123456@192.168.200.136:5672/%2f");
    
            final RabbitMQ:第三章:Springboot集成RabbitMQ(直连模式,工作队列模式,发布订阅模式,路由模式,通配符模式)

    RabbitMQ:第二章:Spring整合RabbitMQ(简单模式,广播模式,路由模式,通配符模式,消息可靠性投递,防止消息丢失,TTL,死信队列,延迟队列,消息积压,消息幂等性)(代码

    RabbitMQ 学习---- 工作队列模式

    RabbitMQ 学习---- 工作队列模式

    RabbitMQ 学习---- 工作队列模式

    (十四)笔记.net学习之RabbitMQ工作模式