RabbitMQ发布与订阅模式类型

Posted 不会压弯的小飞侠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ发布与订阅模式类型相关的知识,希望对你有一定的参考价值。

🍁博客主页:👉不会压弯的小飞侠
✨欢迎关注:👉点赞👍收藏⭐留言✒
✨系列专栏:👉Linux专栏
🔥欢迎大佬指正,一起学习!一起加油!

目录


🍁模式说明

  • 工作队列背后的假设是每个任务都是 只交付给一名工人。在这一部分中,我们将做一些事情 完全不同的 - 我们将向多个传递消息 消费者。此模式称为“发布/订阅”。

  • 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列

🍁发布与订阅模式完成消息传递

  • 编写生产者发送消息
    • 编写消息生产者 Producter
public class Producer 
    public static void main(String[] args) throws Exception 

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        /*
       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配
        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */
        String exchangeName = "test_fanout";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6. 创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        String body = "此消息被交换机发到两个队列中!!!";
        //8. 发送消息
        channel.basicPublish(exchangeName,"",null,body.getBytes());

        //9. 释放资源
        channel.close();
        connection.close();
    


  • 测试发布者

  • 编写消费者接收消息
    • 编写消息消费者Consumer1
public class Consumer1 
    public static void main(String[] args) throws Exception 
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String queue1Name = "test_fanout_queue1";
        Consumer consumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("body:"+new String(body));
            
        ;
        channel.basicConsume(queue1Name,true,consumer);
    

  • 编写消费者接收消息
    • 编写消息消费者Consumer2
public class Consumer2 
    public static void main(String[] args) throws Exception 
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String queue2Name = "test_fanout_queue2";
        Consumer consumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("body:"+new String(body));
            
        ;
        channel.basicConsume(queue2Name,true,consumer);
    

  • 测试
  • 启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。

🍁总结

  • 交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
  • 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
  • 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
  • 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机

RabbitMQ 07 发布订阅模式

发布订阅模式

发布订阅模式结构图:

比如信用卡还款日临近了,那么就会给手机、邮箱发送消息,提示需要去还款了,但是手机短信和邮件发送并不一定是同一个业务提供的,但是现在又希望能够都去执行,就可以用到发布订阅模式,简而言之就是,发布一次,消费多个

实现这种模式需要用到另一种类型的交换机,叫做fanout(扇出)类型,这是一种广播类型,消息会被广播到所有与此交换机绑定的消息队列中。

这里使用默认的扇出交换机:

  1. 定义配置类。

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Exchange;
    import org.springframework.amqp.core.ExchangeBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * RabbitMQ配置类
     */
    @Configuration
    public class RabbitMqConfig 
        
        /**
         * 定义交换机,可以很多个
         * @return 交换机对象
         */
        @Bean
        public Exchange fanoutExchange()
            return ExchangeBuilder.fanoutExchange("amq.fanout").build();
        
    
        /**
         * 定义消息队列
         * @return 消息队列对象
         */
        @Bean
        public Queue fanoutQueue1()
            return new Queue("fanoutQueue1");
        
    
        /**
         * 定义绑定关系
         * @return 绑定关系
         */
        @Bean
        public Binding binding1(@Qualifier("fanoutExchange") Exchange exchange,
                                @Qualifier("fanoutQueue1") Queue queue)
            // 将定义的交换机和队列进行绑定
            return BindingBuilder
                    // 绑定队列
                    .bind(queue)
                    // 到交换机
                    .to(exchange)
                    // 使用自定义的routingKey
                    .with("")
                    // 不设置参数
                    .noargs();
        
    
        /**
         * 定义消息队列
         * @return 消息队列对象
         */
        @Bean
        public Queue fanoutQueue2()
            return new Queue("fanoutQueue2");
        
    
        /**
         * 定义绑定关系
         * @return 绑定关系
         */
        @Bean
        public Binding binding(@Qualifier("fanoutExchange") Exchange exchange,
                               @Qualifier("fanoutQueue2") Queue queue)
            // 将定义的交换机和队列进行绑定
            return BindingBuilder
                    // 绑定队列
                    .bind(queue)
                    // 到交换机
                    .to(exchange)
                    // 使用自定义的routingKey
                    .with("")
                    // 不设置参数
                    .noargs();
        
    
    
  2. 定义消费者。

    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 发布订阅监听器
     */
    @Component
    public class FanoutListener 
    
        @RabbitListener(queuesToDeclare = @Queue("fanoutQueue1"))
        public void receiver1(String message) 
            System.out.println("1号监听器:" + message);
        
    
        @RabbitListener(queuesToDeclare = @Queue("fanoutQueue2"))
        public void receiver2(String message) 
            System.out.println("2号监听器:" + message);
        
    
    
    

    为了避免监听时没有该队列而报错,可以采用queuesToDeclare = @Queue("队列名称")的形式,这样如果没有该队列会自动创建该队列。

  3. 定义生产者。

    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import java.util.concurrent.TimeUnit;
    
    @SpringBootTest
    class RabbitMqSpringBootTests 
    
        /**
         * RabbitTemplate封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 生产者
         */
        @Test
        void producer()  
    
            rabbitTemplate.convertAndSend("amq.fanout", "", "Hello World");
        
    
    
    
    
  4. 启动生产者,发送消息。

    可以看到,发送一条消息,两个消费者都收到了消息,这就是发布订阅模式。


以上是关于RabbitMQ发布与订阅模式类型的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式

RabbitMQ03_Springboot整合RabbitMQ实现发布与订阅模式路由模式通配符模式

RabbitMQ学习第三记:发布/订阅模式(Publish/Subscribe)

RabbitMQ下的生产消费者模式与订阅发布模式

RabbitMQ下的生产消费者模式与订阅发布模式

RabbitMQ02_简单模式Publish/Subscribe发布与订阅模式Routing路由模式Topics通配符模式Work模式-轮询公平