rabbitMQ学习

Posted baoguochun

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitMQ学习相关的知识,希望对你有一定的参考价值。

 

1,消息队列解决什么问题

  异步处理

  应用解耦

  流量消峰(高并发、  秒杀)

  日志处理......

2,virtual host   

  相当于mysql数据库的db,一般以/开头

  授权

3,使用java开发rabbitMQ

  (1) 简单队列

  技术图片

  p:消息生产者

  红色:消息队列

  c:消息消费者

  3个对象,生产者、消息队列、消费者

  使用步骤

  生产端:

    1,获取链接

    2,获取通道

    3,创建队列声明

    4,发布消息

  消费端:

    1,获取链接

    2,创建通道

    3,定义队列消费者

    4,监听队列

  简单队列的不足:耦合性高,一一对应消费者,如果有多个消费者就无法使用;

  work queues 工作队列

4,工作队列(work queues)

  分为两种 模式 轮询和公平

  协议(amqp)

  技术图片

为什么会出现工作队列,simple queue(round )是一一对应,实际开发中,生产者发送消息是好不费力,而消费者一般是需要跟业务相结合,消费者接收到消息就需要处理,可能需要花费时间,

这时候队列就会积压很多消息;

 

轮训分发的机制,不管谁忙谁闲都是你一个完成一个进行分发;

 

公平分发机制-fair dipatch:生产端,basicQos(perfetchCount=1);   消费成功后告诉MQ,才再发一条消息;使用公平分发必须关闭自动应答ack ;能者多劳!

 

5,消息应答与消息持久化

  消息应答:

  boolean aotoAck=true;
       channel.basicConsume("mySimpleQueue", aotoAck, consumer);

 

  boolean aotoAck=true(自动确认模式)一旦 rabbitMQ将消息分发给消费者就会从内存中删除该消息,

  这种情况下如果消费端服务出现异常,未能正确的消费消息,该消息会丢失;

  boolean aotoAck=false(手动模式),如果有消费者异常,就会将该消息交付给其他消费者rabbitMQ支持消费应答,消费者发送一个消息应答,告诉RabbitMQ已已消费完成,

  然后rabbitMQ可以删除该消息,消息应答默认是打开的-false

 

  消息持久化:

  当RabbitMQ服务宕机,存储在内存中的消息会丢失,这是就需要消息持久化;

  

  channel.queueDeclare("mySimpleQueue", durable, false, false, null);

  durable:持久化

 

6,订阅模式publish/subscribe

以上的消息只能由一个消费者消费,不能给多个消费者消费!

订阅模式:

技术图片

  1,一个生产者多个消费者,

  2,每个消费者都有自己的队列,

  3,生产者没有将消息直接发送给队列,而是发送给交换机或者叫转发器exchange

  4,每个队列都要绑定到交换机

  5,生产者发送的消息经过交换机,然后到达队列,就能实现一个消息被多个消费者消费

  代码:

  生产者:

   Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //发送消息
        String exchange="exchageMSG";
        channel.basicPublish(EXCHANGE_NAME, "", null, exchange.getBytes());
        channel.close();
        connection.close();

  发送消息后图形化界面有新的交换机生成,消息并没有存储,因为rabbitMQ中只有队列能存储消息!

 

 

  消费者1-代码

   
    public static final String  EXCHANGE_NAME="my_exchange";
    public static final String QUEUENAME="test_send_email_quene";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUENAME, false, false, false, null);
        //绑定队列到交换机exchange
        channel.queueBind(QUEUENAME, EXCHANGE_NAME, "");
        channel.basicQos(1);//保证一次只分发一个
        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println(msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
                
            };
        };
        
        boolean autoAck=false;
        channel.basicConsume(QUEUENAME, autoAck, consumer);
        
    }

消费者2代码

 
    public static final String  EXCHANGE_NAME="my_exchange";
    public static final String QUEUENAME="test_send_message_quene";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUENAME, false, false, false, null);
        //绑定队列到交换机exchange
        channel.queueBind(QUEUENAME, EXCHANGE_NAME, "");
        channel.basicQos(1);//保证一次只分发一个
        //定义一个消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body);
                System.out.println(msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
                
            };
        };
        
        boolean autoAck=false;
        channel.basicConsume(QUEUENAME, autoAck, consumer);
        
    }

  可视化界面截图
技术图片

7,交换机-转发器(Exchange)

  

  

  一方面是接受生产者的消息,另一方面是向队列推送消息

  匿名转发 声明路由时不指定routingkey,exchangeName  "";

  fanout(不处理路由键)

  技术图片

  Direct(处理路由键)

  技术图片

8,路由模式

rabbit使用步骤

服务端发送消息:1,获取链接;2,通过链接获取通道;3;通过通道声明交换机;4,发布消息(包括交换机和路由);  5,关闭资源  

消费端消费消息: 1,获取链接;2,获取通道;3,通过通道声明队列;4,绑定交换机和路由;5,创建consumer,重写handledelivery方法,获取消息 ;6,监听consumer ;

技术图片

 

 生产者代码


    public static final String EXCHANGE_NAME="rounting_exchange";
    public static final String ROUTINGKEY="info";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        String message="my rounting exchange";
        channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY, null, message.getBytes());
        System.out.println("send...");
        channel.close();
        connection.close();
    }

消费者1:  

 
    public static final String QUEUENAME="test_queue_direct_1";
    public static final String EXCHANGE_NAME="rounting_exchange";
    public static final String ROUTINGKEY="error";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUENAME, false, false, false, null);
        channel.basicQos(1);//保证一次只发一个
        channel.queueBind(QUEUENAME, EXCHANGE_NAME, ROUTINGKEY);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg = new String(body);
                channel.basicAck(envelope.getDeliveryTag(), false); //告诉MQ已成功收到消息,手动确认
                System.out.println(msg);
                System.out.println("test_queue_direct_1");
            }
        };
        boolean autoAck=false;//取消自动应答,默认为关闭状态
        channel.basicConsume(QUEUENAME, autoAck, consumer);
        
    }

消费者2


    public static final String QUENENAME="test_queue_direct_2";
    public static final String EXCHANGENAME="rounting_exchange";
    public static final String ROUTINGKEYNAME1="error";
    public static final String ROUTINGKEYNAME2="info";
    public static final String ROUTINGKEYNAME3="warn";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUENENAME, false, false, false, null);
        channel.queueBind(QUENENAME, EXCHANGENAME, ROUTINGKEYNAME1);
        channel.queueBind(QUENENAME, EXCHANGENAME, ROUTINGKEYNAME2);
        channel.queueBind(QUENENAME, EXCHANGENAME, ROUTINGKEYNAME3);
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                    throws IOException {
                String msg=new String(body);
                channel.basicAck(envelope.getDeliveryTag(), false);
                System.out.println(msg);
                System.out.println(QUENENAME);
            }
        };
        boolean autoAck=false;//关闭自动确认
        channel.basicConsume(QUENENAME, autoAck, consumer);
    }

 9,路由主题模式

  将路由和谋模式(主题)匹配

  #匹配一个货多个

  .匹配一个

 技术图片

官方模型:

技术图片

10,rabbitMQ消息确认机制(事物+confirm)

在rabbitMQ中可以通过持久化数据,解决RabbtMQ服务器异常的数据丢失问题。

问题:生产者将消息发送出去之后消息到底有没有到达rabbitMQ服务器?

  默认情况下是无法知晓的,

  两种方式解决问题:

    AMQP 协议实现了事务机制

    confirm模式

  事务机制:

    txSelect    txCommit     txRollBack

    txSelect用于将当前channel设置成transaction模式

    txCommit用于提交事务

    txRollback用于回滚

    以上操作都是生产者操作

    缺点:降级消息的吞吐量;

  confirm 模式:

    生产者confirm模式的实现原理

    成产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有的匹配队列之后,

    broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确的到达目的队列了,如果消息和队列是可持久化的,那么确认消息后会将消息写

    进磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic。ack的multiple域,表示这个序列号之前的所有消息

    已经得到处理。

    confirm模式最大的好处是它是异步处理;

    Nack

 

  开启confirm模式

  channel.confirmSelect();

  编程模式

    1,普通 发一条 waitForConfirms();

    2,批量 发一批waitForConfirms();

    3,异步confirm 模式,提供一个回调方法

 

     单条代码:

      
      public static final String QUEUENAME="test_confirm_queue1";
      public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
          Connection connection = ConnectionUtils.getConnection();
          Channel channel = connection.createChannel();
          channel.queueDeclare(QUEUENAME, false, false, false, null);
          //生产者调用confirmSelect 将channel 设置为confirm模式
          channel.confirmSelect();
          String msg="provider confirm msg";
          channel.basicPublish("", QUEUENAME, null, msg.getBytes());
          if(!channel.waitForConfirms()) {
              System.out.println("发送失败");
          }else {
              System.out.println("发送成功");
          }
        
          channel.close();
          connection.close();
      }

   批量发送

  
        private static final String QUEUENAME="test_confirm_queue1";
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUENAME, false, false, false, null);
        String msg =  "test_confirm_queue2 msg2   ";
        channel.confirmSelect();
        for(int i=0;i<10;i++) {
            channel.basicPublish("", QUEUENAME, null, (msg+i).getBytes());
        }
        if(channel.waitForConfirms()) {
            System.out.println("批量发送成功");
        }else {
            System.out.println("批量发送失败");
        }
        channel.close();
        connection.close();
        
    }

   异步模式

    概述

    channel对象提供一个ConfirmLisnenner(),回调方法只包含deliveryTag(当前chanel发出的消息序号)。我们需要自己为每一个Chanel维护一个unconfirm的消息序号集合,

    每publish一条数据,集合中元素加1,每回调一次handleAck方法,unConfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上上看,

    这个集合最好采用有序集合sortedSet 存储结构;

  

 

 

 

 

 

 

 

 

 

 

  

 

以上是关于rabbitMQ学习的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq学习:rabbitmq(消息队列)的作用以及rabbitmq之直连交换机

RabbitMQ学习和使用

RabbitMQ学习系列: RabbitMQ安装与配置

RabbitMQ学习系列: RabbitMQ安装与配置

RabbitMQ学习RabbitMQ六大核心部分学习

RabbitMQ简单学习笔记