RabbitMQ工作模式

Posted 不断前进的皮卡丘

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ工作模式相关的知识,希望对你有一定的参考价值。

✨ RabbitMQ工作模式


📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件

Work queues工作队列模式

基本介绍

  • 多个消费者共同消费同一个队列中的消息,可以实现快速消费,避免消息积压,但是多个消费者消费队列的消息的时候,是互斥的,同一个消息只能被一个消费者消费,不可以被多个消费者消费。
  • 应用:对于一些任务比较多的情况,使用工作队列可以提高任务处理的速度

编写代码

抽取公共部分,写一个工具类

  • 我们知道像连接等操作,其实基本上代码都是一样的,每一次写重复的代码其实没有什么意义,我们可以写一个工具类来封装这些操作。
public class ConnectionUtil 
    public static Connection getConnection() throws Exception 
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("192.168.137.118");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("123456");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    

生产者

public class Producer 
    static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception 
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        for (int i = 1; i <= 10; i++) 
            String body = i+"hello rabbitmq~~~";
            channel.basicPublish("",QUEUE_NAME,null,body.getBytes());
        
        channel.close();
        connection.close();
    

消费者1

public class Consumer1 
    static final String QUEUE_NAME = "work_queue";
    public static void main(String[] args) throws Exception 
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        Consumer consumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("body:"+new String(body));
            
        ;
        channel.basicConsume(QUEUE_NAME,true,consumer);
    

消费者2

代码和上面相同

测试

我们先启动两个消费者,然后再启动生产者发送消息,到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。

发布订阅模式

订阅模式类型

  • P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
  • C:消费者,消息的接受者,会一直等待消息到来。
  • Queue:消息队列,接收消息、缓存消息。
  • Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特定队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型
    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key 的队列
    • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
  • 发布订阅模式
    • 每个消费者都监听自己的队列
    • 生产者把消息发送给broker,然后交换机把消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
  • 交换机只负责转发消息,并没有存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

代码编写

生产者

public class Producer 
    public static void main(String[] args) throws Exception 

        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        /*
       exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
       参数:
        1. exchange:交换机名称
        2. type:交换机类型
            DIRECT("direct"),:定向
            FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。
            TOPIC("topic"),通配符的方式
            HEADERS("headers");参数匹配
        3. durable:是否持久化
        4. autoDelete:自动删除
        5. internal:内部使用。 一般false
        6. arguments:参数
        */
        String exchangeName = "test_fanout";
        //5. 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
        //6. 创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        //7. 绑定队列和交换机
        /*
        queueBind(String queue, String exchange, String routingKey)
        参数:
            1. queue:队列名称
            2. exchange:交换机名称
            3. routingKey:路由键,绑定规则
                如果交换机的类型为fanout ,routingKey设置为""
         */
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        String body = "日志信息:张三调用了findAll方法...日志级别:info...";
        //8. 发送消息
        channel.basicPublish(exchangeName,"",null,body.getBytes());

        //9. 释放资源
        channel.close();
        connection.close();
    

消费者1

public class Consumer1 
    public static void main(String[] args) throws Exception 
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String queue1Name = "test_fanout_queue1";
        Consumer consumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台.....");
            
        ;
        channel.basicConsume(queue1Name,true,consumer);
    

消费者2

public class Consumer2 
    public static void main(String[] args) throws Exception 
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String queue2Name = "test_fanout_queue2";
        Consumer consumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台.....");
            
        ;
        channel.basicConsume(queue2Name,true,consumer);
    

测试

启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;达到广播的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges选项卡,点击 fanout_exchange 的交换机,可以查看到如下的绑定:

发布订阅模式和工作队列模式的区别

  • 工作队列模式不需要定义交换机,而发布订阅模式需要定义交换机
  • 发布订阅模式需要设置队列和交换机的绑定,而工作队列模式不需要设置,事实上是因为工作队列模式会把队列绑定到默认的交换机
  • 发布订阅模式中,生产者向交换机发送消息;工作队列模式则是生产者向队列发送消息(底层使用默认交换机)

Routing 路由模式

基本介绍

  • P:生产者,向交换机发送消息的时候,会指定一个routing key
  • X:Exchange(交换机),接收生产者的消息,然后把消息传递给和routing key完全匹配的队列
  • C1:消费者,它所在队列指定了需要routing key为error的信息
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

路由模式的特点

  • 队列和交换机的绑定是需要指定routing key的,不可以随意绑定
  • 消息的发送方向交换机发送消息的时候,也需要指定消息的routing key
  • 交换机不再把消息交给每一个绑定的队列,而是根据消息的routing key来进行判断,只有队列的routing key和消息的routing key完全一样才会接收到消息。

编写代码

生产者

public class Producer 
    public static void main(String[] args) throws Exception 
       Connection connection = ConnectionUtil.getConnection();
       Channel channel = connection.createChannel();
       String exchangeName = "test_direct";
       // 创建交换机
       channel.exchangeDeclare(exchangeName,BuiltinExchangeType.DIRECT,true,false,false,null);
       // 创建队列
       String queue1Name = "test_direct_queue1";
       String queue2Name = "test_direct_queue2";
        // 声明(创建)队列
       channel.queueDeclare(queue1Name,true,false,false,null);
       channel.queueDeclare(queue2Name,true,false,false,null);
       // 队列绑定交换机
        // 队列1绑定error
        channel.queueBind(queue1Name,exchangeName,"error");
        // 队列2绑定info error warning
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");

        String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";
        // 发送消息
        channel.basicPublish(exchangeName,"warning",null,message.getBytes());
        System.out.println(message);

        channel.close();
        connection.close();
    

运行

消费者1

public class Consumer1 
    public static void main(String[] args) throws Exception 
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String queue1Name = "test_direct_queue1";
        Consumer consumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息打印到控制台.....");
            
        ;
        channel.basicConsume(queue1Name,true,consumer);
    

消费者2

public class Consumer2 
    public static void main(String[] args) throws Exception 
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String queue2Name = "test_direct_queue2";
        Consumer consumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("body:"+new String(body));
                System.out.println("将日志信息存储到数据库.....");
            
        ;
        channel.basicConsume(queue2Name,true,consumer);
    

Topics 通配符模式

基本介绍

  • Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符
  • Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
  • 通配符规则:
    • #:匹配0个或者多个词
    • *:刚好可以匹配一个词


代码

生产者

public class Producer 
    public static void main(String[] args) throws Exception 
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "test_topic";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);
        // 绑定队列和交换机
        /**
         *  参数:
             1. queue:队列名称
             2. exchange:交换机名称
             3. routingKey:路由键,绑定规则
                 如果交换机的类型为fanout ,routingKey设置为""
         */
        // routing key  系统的名称.日志的级别。
        //需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");
        channel.queueBind(queue2Name,exchangeName,"*.*");
        String body = "日志信息:张三调用了findAll方法...日志级别:info...";
        //发送消息goods.info,goods.error
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
        channel.close();
        connection.close();
    

消费者1

public class Consumer1 
    public static void main(String[] args) throws Exception 
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        String queue1Name = "test_topic_queue1";
        Consumer consumer = new DefaultConsumer(channel)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException 
                System.out.println("body:"+new String(body));
            
        ;
        channel.basicConsume(queue1Name,true,consumer);
    

消费者2

public class Consumer2 
    public static 以上是关于RabbitMQ工作模式的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ入门教程:扇形交换机发布/订阅(Publish/Subscribe)

RabbitMQ 一二事 - 工作队列使用

WebAPi的可视化输出模式(RabbitMQ消息补偿相关)——所有webapi似乎都缺失的一个功能

WebAPi的可视化输出模式(RabbitMQ消息补偿相关)所有webapi似乎都缺失的一个功能

WebAPi的可视化输出模式(RabbitMQ消息补偿相关)——所有webapi似乎都缺失的一个功能

RabbitMQ广播模式