rabbitMQ的进阶使用

Posted 零

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitMQ的进阶使用相关的知识,希望对你有一定的参考价值。

1.1 消息队列持久化

创建一个队列的时候,可以是非持久化的,也可以是持久化的

  • 非持久化:rabbitmq如果重启,该队列就会被删除
  • 持久化:重启不影响
  • 消息持久化必须要消息队列持久化
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

1.2 消息的持久化

消息持久化,可以一定程度上去预防消息丢失,需要设置MessageProperties.PERSISTENT_TEXT_PLAIN

注意:配置了消息持久化,并不能够完全保证消息不丢失,只能保证消息到了队列中不消失

消息丢失的方式:

  • 发送方发送到消息队列的时候,丢了
  • 交换机到队列中,丢了
  • 队列到消费者,丢了,目前所学的知识点,队列中还有消息,再次发送

后续的解决方案,参考确认Confirm,Return

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

1.3 ACK

默认情况下,消费者接收到消息的时候(但是通常接到消息就处理了),就会进行自动应答(ACK)

如果一个消费者处理消息时间长,或者异常了,所以需要手动Ack

  • 自动Ack
DeliverCallback deliverCallback = (consumerTag, delivery) -> 
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
;
// true 表示自动ack:autoAck
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag ->  );
  • 手动Ack
// 1.channel.basicConsume 将true改为false
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag ->  );

// 2.在finally 中手动Ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

1.4 消费不均匀

工作模式中,默认轮询,会出现一些问题,比如:两个消费者,亮跟大哥,实现功能(10个),

亮:1,3,5,7,9

大哥:2,4,6,8,10

但是大哥实现的快,出现一个问题,大哥3个小时完成了,摸鱼5小时,亮做了8小时

所以我们可以设置一个预抓取值,官网给的是一次抓取一个任务(每次抓取一个消息,当上一个消息没有ack的时候,不抓取新的消息)

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意:这里的预抓取值给的是1,这个值太小(消费者能力比较强,就会等待),但是具体还是看服务器性能跟消息复杂度,通常情况下100-300

2 Publish/SubscribeFanout

多出来的X,就是交换机,交换机接收发送方的消息,将消息发送到队列中,交换机还有不同的类型

Exchange的类型

  • Fanout:交换机会将消息发送到所有和它进行绑定的队列上,广播,群发
  • Direct:直接交换机通过消息上的路由键直接对消息进行分发,相当于精确匹配,一对一
  • Topic:这个交换机会将路由键和绑定上的模式进行通配符匹配,相当于模糊匹配,一对多
  • Headers:消息头交换机使用消息头的属性进行消息路由,相当于模糊匹配(like header%),一对多

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bmlRWZxU-1649855291807)(消息队列.assets/image-20220413104215839.png)]

2.1 操作

  • 发送方
public class EmitLog 

    // 交换机名称
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception 
        try (Channel channel = RabbitMqUtils.getChannel()) 
            // 创建交换机,并设置交换机类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            Scanner scanner = new Scanner(System.in);
            while (true) 
                System.out.print("请输入要发送的消息:");
                String msg = scanner.next();
                channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes("UTF-8"));
                System.out.println("消息已发送:" + msg);
            
        

    

  • 消费方 01- 控制台打印
// 第一个消费者
public class ReceiveLogs01 

    // 交换机名称
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        // 创建一个交换机,并设置类型
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 创建一个非持久化队列
        String queueName = channel.queueDeclare().getQueue();
        // 将交换机和队列绑定
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" 准备接收消息。。。");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> 
            String message = new String(delivery.getBody(), "UTF-8");
            // 第一个消费者在控制台进行答应
            System.out.println("控制台打印的消息:" + message);
        ;
        channel.basicConsume(queueName, true, deliverCallback, consumerTag ->  );
    

  • 消费者02-保存到本地硬盘
public class ReceiveLogs02 

    // 交换机名称
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        // 创建一个交换机,并设置类型
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        // 创建一个非持久化队列
        String queueName = channel.queueDeclare().getQueue();
        // 将交换机和队列绑定
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println(" 准备接收消息。。。");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> 
            String message = new String(delivery.getBody(), "UTF-8");
            // 第二个消费者将消息保存到本地
            File file = new File("C:\\\\rabbitmq_log.txt");
            FileUtils.writeStringToFile(file, message, "UTF-8");
            System.out.println("日志写入成功");
        ;
        channel.basicConsume(queueName, true, deliverCallback, consumerTag ->  );
    

3 Routing(direct

在上一个发布订阅中,实现了一对多,但是每一个消息,都会执行我们指定的两个操作,粒度更加细致

对于error级别的信息,需要在控制台打印,并且保存到本地,对于其他信息,仅仅是控制台打印

3.1 操作

  • 发送方
public class EmitLogDirect 

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws Exception 
        final Channel channel = RabbitMqUtils.getChannel();
        // 1.创建交换机,更换了交换机的类型
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 准备路由的key
        Map<String, String> map = new HashMap<>();
        map.put("info", "普通的info消息");
        map.put("warning", "warning警告消息");
        map.put("error", "error错误消息");
        map.put("debug", "debug调式消息");

        // 循环map,key routkey, value,是发送的消息
        for (Map.Entry<String, String> entry : map.entrySet()) 
            String key = entry.getKey();
            String value = entry.getValue();
            // 发送
            channel.basicPublish(EXCHANGE_NAME, key, null, value.getBytes("UTF-8"));
            System.out.println("消息发送成功");
        
    

4 Topics (Topic)

发送到主题交换的消息不能有任意的 routing_key - 它必须是单词列表,由点分隔。

这些词可以是任何东西,但通常它们指定与消息相关的一些特征。一些有效的路由键示例:“ stock.usd.nyse ”、“ nyse.vmw ”、“ quick.orange.rabbit ”。路由键中可以有任意多的单词,最多为 255 个字节

  • *(星号)可以只替换一个单词。

  • # (hash) 可以代替零个或多个单词

  • 绑定的时候出现情况

    • 当一个队列绑定的是#,那么这个队列是不是接收了所有的消息,这个时候跟fanout有点像
    • 当一个队列绑定的既没有#,也没有*,那就是一个direct
  • quick.orange.rabbit - Q1,Q2

  • lazy.orange.ele - Q1,Q2

  • quick.orange.fix - Q1

  • lazy.brown.fox - Q2

  • lazy.pink.rabbit - Q2 满足了两次,消息一条

  • quick.brown.fox - 不满足

  • quick.orange.male.rabbit - 不满足

  • lazy.orange.male.rabbit - Q2

4.1 操作

  • 发送方
public class EmitLogTopic 

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception 
        final Channel channel = RabbitMqUtils.getChannel();
        // 1.创建交换机,更换了交换机的类型
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 准备路由的key
        Map<String, String> map = new HashMap<>();
        map.put("quick.orange.rabbit", "Q1,Q2接收");
        map.put("lazy.orange.ele", "Q1,Q2接收");
        map.put("quick.orange.fix", "Q1接收");
        map.put("lazy.pink.rabbit", "Q2接收");
        map.put("quick.brown.fox", "不满足");
        map.put("quick.orange.male.rabbit", "不满足");
        map.put("lazy.orange.male.rabbit", "Q2接收");

        // 循环map,key routkey, value,是发送的消息
        for (Map.Entry<String, String> entry : map.entrySet()) 
            String key = entry.getKey();
            String value = entry.getValue();
            // 发送
            channel.basicPublish(EXCHANGE_NAME, key, null, value.getBytes("UTF-8"));
            System.out.println("消息发送成功");
        
    

  • 消费方
public class ReceiveLogsTopict01 
    // 交换机名称
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        // 创建一个交换机,并设置类型
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 创建一个非持久化队列
        String queueName = channel.queueDeclare().getQueue();
        // 将交换机和队列绑定
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
        System.out.println(" 准备接收消息。。。");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> 
            String message = new String(delivery.getBody(), "UTF-8");
            message = "绑定的key:" + delivery.getEnvelope().getRoutingKey() + "-绑定的值:" + message + "-规则(*.orange.*)";
            // 第一个消费者在控制台进行答应
            System.out.println("控制台打印的消息:" + message);
        ;
        channel.basicConsume(queueName, true, deliverCallback, consumerTag ->  );
    

public class ReceiveLogsTopict02 
    // 交换机名称
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception 
        Channel channel = RabbitMqUtils.getChannel();
        // 创建一个交换机,并设置类型
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        // 创建一个非持久化队列
        String queueName = channel.queueDeclare().getQueue();
        // 将交换机和队列绑定
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
        System.out.println(" 准备接收消息。。。");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> 
            String message = new String(delivery.getBody(), "UTF-8");
            message = "绑定的key:" + delivery.getEnvelope().getRoutingKey() + "-绑定的值:" + message + "-规则:(*.*.rabbit,lazy.#)";
            // 第一个消费者在控制台进行答应
            System.out.println("控制台打印的消息:" + message);
        ;
        channel.basicConsume(queueName, true, deliverCallback, consumerTag ->  );
    

5 Publisher Confirms

不仅仅是防止发送方到mq的消息丢失,还可以通过异步的方式来提高效率

1.为了消息不丢失,持久化,持久化是同步的,可以通过confirm的异步确认提高效率

// 1.启用发布取人
channel.confirmSelect();

5.1 三种方式

1.单个确认(同步)

2.批量确认(同步)

3.异步确认

以上是关于rabbitMQ的进阶使用的主要内容,如果未能解决你的问题,请参考以下文章

python使用消息队列RabbitMq(进阶)

rabbitMQ的进阶使用

RABBITMQ 总结,从基础到进阶

RabbitMQ 进阶 -- 阿里云服务器部署RabbitMQ集群

.Net RabbitMQ实战指南——进阶

RabbitMQ-进阶