RabbitMQ支持的消息模型

Posted 李子怡

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ支持的消息模型相关的知识,希望对你有一定的参考价值。

目录

1 RabbitMQ

AMQP协议

2 RabbitMQ支持的消息模型

2.1官网

2.2引入依赖

2.3第一种模型(直连)

1.开发生产者

2.开发消费者

2.4RabbitMQ中连接工具类封装

2.5细节

2.6第二种模型(work quene)

1.定义:

2.角色:

3.生产者:

4.消费者 创建2个: // 区别是是否带下面这个线程休眠代码

5.总结:

6.缺点:

2.7.第三种模型(fanout| 广播)

1.在广播模式下,消息发送流程是这样的:

2.生产者

3.消费者

2.8第四种模型(Routing) 路由

1. Routing之订阅模型-Direct(直连)

2.生产者

3.消费者

2.9.第五种模型(Topics) 动态路由

1.定义:

2.生产者 当前是user.save

3.消费者


1 RabbitMQ

基于AMQP协议,erlang语言开发, 是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

官网:https://www.rabbitmq.com/

官方教程:https://www.rabbitmq.com/getstarted.html

AMQP协议

AMQP (advanced message queuing protocol,翻译是:高级消息队列协议)、 在2003年时被提出, 最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是- 种协议,更准确的说是一种binary wirelevel protocol (链接协议)。 这是其和JMS的本质差别,AMQP不从API层进行限定, 而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:

 生产者把消息,发送给交换机。交换机和队列是一一绑定就是点对点,交换机路由到其他队列,可以做路由。

2 RabbitMQ支持的消息模型

2.1官网

https://www.rabbitmq.com/getstarted.html

2.2引入依赖

    <!--引入rabbitmq的相关依赖-->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.7.3</version>
    </dependency>

为什么写测试代码需要把下面这行删掉?因为这是作用范围,

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.11</version>
<!--      <scope>test</scope>-->
    </dependency>

2.3第一种模型(直连)

在上图的模型中,有以下概念:

P:生产者,也就是要发送消息的程序

C:消费者:消息的接受者,会-直等待消息到来。

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

1.开发生产者

public class Provider {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
        // 创建到服务器的连接
        ConnectionFactory factory = new ConnectionFactory();
        // 连接rabbitmq主机
        factory.setHost("192.168.231.141");
        // 设置端口号
        factory.setPort(5672);
        // 设置连接那个虚拟主机
        factory.setVirtualHost("/ems");
        // 设置访问虚拟主机的用户名和密码
        factory.setUsername("ems");
        factory.setPassword("123456");
        // 获取连接对象
        try (Connection connection = factory.newConnection();
            // 获取连接中的通道对象
            Channel channel = connection.createChannel()) {
            // 通道绑定对应消息队列
                // 参数1.队列名 如果队列不存在,自动创建
                // 2. durable 用来定义队列的特性是否要持久化 true 持久化
                // 3. 是否独占队列 true独占
                // 4. 是否在消费完成后,自动删除队列, true 自动删除
                // 5. 额外参数
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            // 发布消息
                // 1.交换机名称,因为没有交换机,所以为空
                // 2.队列名
                // 3.发布消息时的属性|传递消息的额外设置
                // 4.发布消息的具体内容,要求是字节类型的数组,
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

这里没有明显的写关闭代码是因为官网说:

2.开发消费者

 

消费了消息

这里不需要关闭的代码是因为,希望他可以一直监听,等到有消息的时候,直接拿到消息:

2.4RabbitMQ中连接工具类封装

public abstract class RabbitMQUtils {
    // 重量级资源,不希望使用一次创建一次。而是希望在类加载的时候就创建出来。
    private static ConnectionFactory factory = new ConnectionFactory();
    // 静态代码块,类加载的时候只执行一次
    static {
        // 连接rabbitmq主机
        factory.setHost("192.168.231.141");
        // 设置端口号
        factory.setPort(5672);
        // 设置连接那个虚拟主机
        factory.setVirtualHost("/ems");
        // 设置访问虚拟主机的用户名和密码
        factory.setUsername("ems");
        factory.setPassword("123456");
    }

    public static Connection getConnection() {
        try {
            // 获取连接对象
            Connection connection = factory.newConnection();
            return connection;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

2.5细节

队列绑定通道

        // 通道绑定对应消息队列
        // 参数1.队列名 如果队列不存在,自动创建
        // 2. durable 用来定义队列的特性是否要持久化 true 持久化  持久化是指在磁盘中进行队列的保存,当重启rabbitmq服务时,队列以及队列中的消息,都会丢失
        // 3. 是否独占队列 true独占
        // 4. 是否在消费完成后,自动删除队列, true 自动删除
        // 5. 额外参数
        channel.queueDeclare("A", false, false, false, null);

但是,不是绑定了,就一定在这里发送消息。是由下面的代码决定的:

String message = "Hello World!";
        // 发布消息
        // 1.交换机名称,因为没有交换机,所以为空
        // 2.队列名
        // 3.发布消息时的属性|传递消息的额外设置
        // 4.发布消息的具体内容,要求是字节类型的数组,
        channel.basicPublish("", "B", null, message.getBytes());

虽然队列A和通道channel绑定了,但是实际是发送消息的是队列B。队列A和队列B如果在页面上没有,执行代码后会新建。

queueDeclare第二个参数:

下面出现D表示设置为true,现在是队列持久化。但是重启后,消息还是会丢失。

怎么既保证队列持久化,又保证消息持久化呢?

对消息做持久化。

basicPublish第三个参数:设置为:MessageProperties.PERSISTENT_TEXT_PLAIN

queueDeclare第三个参数:

一般是false,希望多个通道共用一个队列

queueDeclare第四个参数:

当设置为true时,会出现下面的标志。且关闭消费者与队列的连接(关闭消费者的程序),队列就会消失。

2.6第二种模型(work quene)

1.定义:

Work queues, 也被称为( Task queues), 任务模型。 当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

2.角色:

●P:生产者:任务的发布者

●C1:消费者,领取任务并且完成任务,假设完成速度较慢

●C2:消费者2:领取任务并完成任务,假设完成速度快

3.生产者:

public class Provider {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取连接中的通道对象
        Channel channel = connection.createChannel();
        // 通过通道声明队列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        // 发送10条消息
        for (int i = 0; i < 20; i++) {
            channel.basicPublish("", TASK_QUEUE_NAME, null, (i+"你好啊").getBytes());
        }
    }
}public class Provider {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取连接中的通道对象
        Channel channel = connection.createChannel();
        // 通过通道声明队列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        // 发送10条消息
        for (int i = 0; i < 20; i++) {
            channel.basicPublish("", TASK_QUEUE_NAME, null, (i+"你好啊").getBytes());
        }
    }
}

4.消费者 创建2个: // 区别是是否带下面这个线程休眠代码

public class Customer2 {
    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取连接中的通道对象
        Channel channel = connection.createChannel();
        // 每次只能消费一个消息
        channel.basicQos(1);
        // 通道绑定消息队列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        // 获取消息  参数2:消息自动确认,消费者自动向rabbitmq确认消息消费(只要消息队列有消息,就分配给消费者)
        channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1:" + new String(body));
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 参数1:确认队列中那个具体消息 参数2:是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

5.总结:

默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。

6.缺点:

这样情况下,当其中一个消费比较慢,比如消费者3已经消费完。但是消费者1还是一个个的接收。这样会造成消息的积累。

我们希望处理快的可以多处理一点。怎么实现?

1.消费者:要关闭自动确认消息

      // 获取消息  参数2:消息自动确认,消费者自动向rabbitmq确认消息消费(只要消息队列有消息,就分配给消费者,不管下面的代码是否执行完)
        channel.basicConsume(TASK_QUEUE_NAME, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者-1:" + new String(body));
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

所以要把上面的参数设为false,这样

2.还需要告诉当前通道每一次只能消费一个消息。

        // 每次只能消费一个消息|一次只接收一条未确认的消息
        channel.basicQos(1);

3.消息确认:手动确认消息

出现下面情况,是因为少消息确认的代码

         // 参数1:确认队列中那个具体消息 参数2:是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),false);

消息确认后,再从队列中删除

2.7.第三种模型(fanout| 广播)

fanout 扇出 也称为广播

适合于注册业务,如既要发积分,有要短信认证。?“??

比如购物车结算的时候,是订单系统,库存系统 ??

1.在广播模式下,消息发送流程是这样的:

●可以有多个消费者

●每个消费者有自己的queue (队列)

●每个队列都要绑定到Exchange (交换机)

生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

●交换机把消息发送给绑定过的所有队列

●队列的消费者都能拿到消息。实现一 条消息被多个消费者消费

2.生产者

public class Provider {
    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        // 获取连接中的通道对象
        Channel channel = connection.createChannel();

        // 将通道声明指定交换机 参数1:交换机名称 参数2:交换机类型,fanout是广播类型
        channel.exchangeDeclare("logs","fanout");
        // 发送消息
        channel.basicPublish("logs","",null,"fanout type message".getBytes());
    }
}

3.消费者

public class Customer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("logs","fanout");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 绑定交换机和队列
        channel.queueBind(queue,"logs","");
        // 消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1"+new String(body));
            }
        });
    }
}
public class Customer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("logs","fanout");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 绑定交换机和队列
        channel.queueBind(queue,"logs","");
        // 消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2"+new String(body));
            }
        });
    }
}

广播模型:生产者发一条消息,消费者1和2会都拿到

2.8第四种模型(Routing) 路由

1. Routing之订阅模型-Direct(直连)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

●队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey (路 由key)

●消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey。

●Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息

举个场景,出现error时,既需要在控制台打印,又需要在日志中持久化

图解:

●P:生产者,向Exchange发送消息, 发送消息时,会指定一 个routing key

●x: Exchange (交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列

●C1:消费者,其所在队列指定了需要routing key为error的消息

●C2:消费者,其所在队列指定了需要routing key为info、 error、 warning的消息

2.生产者

public class Provider {
    public static void main(String[] args) throws IOException {
        // 创建connection
        Connection connection = RabbitMQUtils.getConnection();
        // 创建channel
        Channel channel = connection.createChannel();
        String exchangeName = "logs_direct";
        // 通道声明指定交换机 参数1:交换机名称 参数2:交换机类型,direct是路由模式
        channel.exchangeDeclare(exchangeName,"direct");
        // 发送消息
        channel.basicPublish(exchangeName,"error",null,("这是direct模型发布的基于route" +
                " kye: ["+"info"+"]发送的消息").getBytes());
    }
}

error时,消费者都会收到消息:

info时,消费者2会收到消息

3.消费者

public class Customer1 {
    public static void main(String[] args) throws IOException {
        // 1.建立连接
        Connection connection = RabbitMQUtils.getConnection();
        // 建立channel
        Channel channel = connection.createChannel();
        String exchangeName = "logs_direct";
        // 绑定交换机和通道
        channel.exchangeDeclare(exchangeName,"direct");
        // 创建队列
        String queue = channel.queueDeclare().getQueue();
        // 基于路由key ,绑定队列和交换机
        channel.queueBind(queue,exchangeName,"error");
        // 消费
        channel.basicConsume(queue,true,new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:"+ new String(body));
            }
        });
    }
}
public class Customer2 {
    public static void main(String[] args) throws IOException {
        // 1.建立连接
        Connection connection = RabbitMQUtils.getConnection();
        // 建立channel
        Channel channel = connection.createChannel();
        String exchangeName = "logs_direct";
        // 绑定交换机和通道
        channel.exchangeDeclare(exchangeName,"direct");
        // 创建队列
        String queue = channel.queueDeclare().getQueue();
        // 基于路由key ,绑定队列和交换机
        channel.queueBind(queue,exchangeName,"info");
        channel.queueBind(queue,exchangeName,"error");
        channel.queueBind(queue,exchangeName,"warning");

        // 消费
        channel.basicConsume(queue,true,new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:"+ new String(body));
            }
        });
    }
}

2.9.第五种模型(Topics) 动态路由

因为第四种消费者这样不灵活:就是一个写一行代码

1.定义:

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符。这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以"."分割,例如: item . insert

# 通配符
	* (star) can substitute for exactly one word.    匹配不多不少恰好1个词
	# (hash) can substitute for zero or more words.  匹配0个,1个或多个词
#如:
	audit.#    匹配audit . irs . corporate或者audit.irs 等
	audit.*    只能匹配audit.irs

2.生产者 当前是user.save

public class Provider {
    public static void main(String[] args) throws IOException {
        // 建立连接工厂
        Connection connection = RabbitMQUtils.getConnection();
        // 建立channel
        Channel channel = connection.createChannel();
        String exchangeName = "topics";
        // 交换机
        channel.exchangeDeclare(exchangeName,"topic");
        // 发消息
        String routekey = "user.save";
        channel.basicPublish(exchangeName,routekey,null,("这是topic动态路由模型发布的基于route" +
                " key :["+routekey+"]").getBytes());
    }
}

3.消费者

public class Customer1 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("topics","topic");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 绑定交换机和队列
        channel.queueBind(queue,"topics","user.*");
        // 消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1"+new String(body));
            }
        });
    }
}
public class Customer2 {
    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMQUtils.getConnection();

        Channel channel = connection.createChannel();
        //通道绑定交换机
        channel.exchangeDeclare("topics","topic");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 绑定交换机和队列
        channel.queueBind(queue,"topics","user.#");
        // 消费消息
        channel.basicConsume(queue,true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2"+new String(body));
            }
        });
    }
}

以上是关于RabbitMQ支持的消息模型的主要内容,如果未能解决你的问题,请参考以下文章

消息总线之模型重构

rabbitmq - 不会获取队列中的所有消息

Python实现RabbitMQ中6种消息模型

一锅端,RabbitMQ五种消息传输模型

RabbitMQ-AMQP模型详解

RabbitMQ存储模型