RabbitMQ自学入门

Posted 走路带_风

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ自学入门相关的知识,希望对你有一定的参考价值。

// 连接RabbitMQ 简单工具类

public static Connection getConnection() throws Exception
        //1. 定义一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 获取服务地址
        factory.setHost("127.0.0.1");
        //3. AMQP 5672
        factory.setPort(5672);
        //4. vhost
        factory.setVirtualHost("/test");
        //5. 用户名
        factory.setUsername("test");
        //6. 密码
        factory.setPassword("test");
        return factory.newConnection();
   

 

// 获取连接,生成通道

Connection connection = ConnectionUtils.getConnection();

Channel channel = connection.createChannel();

1)simple - 简单队列

涉及3个对象: 生产者  队列 RabbitMQ  消费者

生产者:

    // 申明队列

    channel.queueDeclare(QUEUE_NAME,false,false,false,null);

    // 推送信息

    channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());

消费者:

    // 申明队列

    channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 
     DefaultConsumer consumer = new DefaultConsumer(channel)
            //获取到达的消息
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
                String msg = new String(body, "utf-8");

                // 消费消息
                System.out.println("new api recv: " + msg);
           
        ;
 
        //4. 监听队列
        channel.basicConsume(QUEUE_NAME,true,consumer);

缺点:耦合性高,生产者一 一对应消费者。

 

2)work queues - 工作队列

channel.basicQos(1); //保证每次只分发一个

boolean autoAck = true;

channel.basicConsume(QUEUE_NAME,autoAck,consumer);

autoAck true 代表自动应答,

false代表需要如下手动应答:

channel.basicAck(envelope.getDeliveryTag(),false);

轮询分发:自动应答,任务消息总是均匀分配

公平分发:手动应答,能者多劳

 

3)消息持久化

Boolean durable = true;// 持久化,注意在已存在的队列上修改是不行的,可以删除原有队列或重新生成队列。

channel.queueDeclare(QUEUE_NAME,durable ,false,false,null);

 

4)订阅模式 publish/subscribe

  • 一个生产者,多个消费者
  • 每一个消费者都有自己的队列
  • 生产者没有直接把消息发送到队列 而是发送到了交换机 转发器(exchange)
  • 每个队列都要绑定到交换机上,
  • 生产者发送的消息 经过交换机 达到队列 就能实现一个消息被多个消费者消费

生产者:

channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); // 声明交换机,"fanout"为分发

消费者:

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");//绑定队列到交换机转发器

 

5)Exchange(交换机 转发器)

一方面是接收生产者的消息,另一方面是向队列推送消息。

匿名转发 ""

fanout(不处理路由键)

direct (处理路由器)

 

6)路由模式

生产者:

channel.exchangeDeclare(EXCHANGE_NAME,"direct"); // 声明交换机

String routingKey = "info"; // error、warn..  绑定路由

channel.basicPublish(EXCHANGE_NAME,routingKey ,null,msg.getBytes());

消费者:

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");// 消费者绑定到路由

 

7)事务机制

txSelect  txCommit  txRollback

txSelect:用户将当前 channel 设置成 transaction 模式

txCommit  :用于提交事务;

txRollback:回滚事务。

这种模式很耗时,采用这种方式,降低了 RabbitMQ 的消息吞吐量
 

8)Conform 模式

生产者端 conform 模式的实现原理:

生产者将信道设置成 conform 模式,一旦信道进入 conform 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经到达目的队列了,如果消息和队列是持久化的,那么确认消息会将消息写入磁盘之后发出,broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

Conform 模式最大的好处就是他是异步。

开启 conform 模式    Channel.conformSelect()

编程模式:

①普通 发一条 waitForConfirms()

②批量的 发一批 channel.waitForConfirmsOrDie(); //直到所有信息都发布,只要有一个未确认就会IOException

③异步 conform 模式:提供一个 回调方法
 

异步模式:

channel 对象提供的 ConfirmListener() 回调方法只包含 deliveryTag (当前 channel 发出的消息序列),我们需要自己为每一个 Channel 维护一个 unconfirm 的消息序号集合,每 publish 一条数据,集合中元素加 1 ,每回调一次 handleAck 方法,unconfirm 集合删掉相应的一条(multiple = false)或多条(multiple = true)记录。从程序运行效率上看,这个 unconfirm 集合最好采用有序集合 SortedSed 存储结构。

例如:

SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener()
       public void handleAck(long deliveryTag, boolean multiple) throws IOException
            if (multiple)
                  confirmSet.headSet(deliveryTag + 1L).clear();
              else
                    confirmSet.remove(deliveryTag);
             
       
       public void handleNack(long deliveryTag, boolean multiple) throws IOException
             if (multiple)
                 confirmSet.headSet(deliveryTag + 1L).clear();
              else
                  confirmSet.remove(deliveryTag);
             
       
);

以上是关于RabbitMQ自学入门的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ官方入门案例

小白自学JAVA怎么入门?

程序员入门:如何自学编程

自学python记录_新手入门

MFC入门书籍

SpringBoot入门十三,添加RabbitMq