RabbitMQ基础篇

Posted light-sunset

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ基础篇相关的知识,希望对你有一定的参考价值。

  RabbitMQ是基于AMQP(Advanced Message Queue)标准协议规范的实现,由Erlang语言开发。

  RabbitMQ结构图:

    技术图片


一、名词概念

  Broker:消息队列服务器实体。

  Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

  Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

  Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

  Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

  vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

  producer:消息生产者,就是投递消息的程序。

  consumer:消息消费者,就是接受消息的程序,消费者分为持续订阅(basicConsumer)和单条订阅(basicGet)。

  channel:消息通道,建立在TCP连接之上。在客户端的每个连接里,可建立多个channel【实际上信道的创建是没有限制的】,每个channel代表一个会话任务。


二、使用流程

      技术图片

  生产者(客户端):

    (1)客户端连接到消息队列服务器,打开一个channel。

    (2)客户端声明一个exchange,并设置相关属性。

    (3)客户端声明一个queue,并设置相关属性。【注:若不声明队列,则rabbitmq会默认生成个随机队列】

    (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。

    (5)客户端投递消息到exchange。   

技术图片
 1 package producer.normal;
 2 
 3 import com.rabbitmq.client.BuiltinExchangeType;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.ConnectionFactory;
 7 
 8 import java.io.IOException;
 9 import java.util.concurrent.TimeoutException;
10 
11 public class DirectProducer {
12 
13     private final static String EXCHANGE_NAME = "direct_log";
14     public static void main(String[] args) throws IOException, TimeoutException {
15         // 创建连接工厂
16         ConnectionFactory cf = new ConnectionFactory();
17         // 工厂属性配置,列举如下几个,其他属性自行点击ConnectionFactory进去查看
18         cf.setHost("127.0.0.1");// ip
19         cf.setPort(5672);// 端口
20         cf.setUsername("guest");// 用户名
21         cf.setPassword("guest");// 密码
22         // 创建连接
23         Connection con = cf.newConnection();
24         // 创建信道
25         Channel channel = con.createChannel();
26         // 声明交换器
27         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
28         String[] logLevels = {"info", "error", "warning"};
29         for (int i = 0; i < logLevels.length; i++) {
30             String logLevel = logLevels[i];
31             String message = logLevel + ":" + "hello rabbitmq";
32             channel.basicPublish(EXCHANGE_NAME, logLevel, null, message.getBytes());
33             System.out.println("send message:" + message);
34         }
35         channel.close();
36         con.close();
37     }
38 }
生产者

  消费者:

    (1)连接到消息队列服务器,打开一个channel。

    (2)声明一个exchange,并设置相关属性。

    (3)声明一个queue,并设置相关属性。【注:若不声明队列,则rabbitmq会默认生成个随机队列】

    (4)使用routing key,在exchange和queue之间建立好绑定关系。

    (5)接收exchange中的消息进行消费处理。

技术图片
 1 package consumer.normal;
 2 
 3 import com.rabbitmq.client.*;
 4 
 5 import java.io.IOException;
 6 import java.nio.charset.Charset;
 7 import java.util.concurrent.TimeoutException;
 8 
 9 public class DirectConsumerAll {
10     private final static String EXCHANGE_NAME = "direct_log";
11 
12     public static void main(String[] args) throws IOException, TimeoutException {
13         ConnectionFactory cf = new ConnectionFactory();
14         Connection con = cf.newConnection();
15         Channel channel = con.createChannel();
16         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
17         String queueName = channel.queueDeclare().getQueue();//声明随机队列
18         String[] logLevels = {"info", "error", "warning"};
19         for (String logLevel : logLevels) {
20             channel.queueBind(queueName, EXCHANGE_NAME, logLevel);
21         }
22         System.out.println("waiting message...");
23         Consumer consumer = new DefaultConsumer(channel) {
24             @Override
25             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
26                                        byte[] body) {
27                 String message = new String(body, Charset.defaultCharset());
28                 System.out.println("accept " + envelope.getRoutingKey() + ":" + message);
29             }
30         };
31         channel.basicConsume(queueName, true, consumer);
32 
33     }
34 }
消费者

三、交换器类型

  DIRECT:完全路由键匹配,例如,绑定时设置了routing key为"abc",那么客户端提交的消息,只有设置了key为"abc"的才会投递到队列。

  FANOUT:不需要路由键,采取广播模式。例如,一条消息进来时,投递到与该交换机绑定的所有队列。

  TOPIC:主题,根据路由键进行模式匹配投递消息,符号"#"匹配一个或多个词,符号"*"匹配正好一个词。例如"abc.#"匹配"abc.def.ghi","abc.*"只匹配"abc.def"。

  HEADERS:消息头,不作过多讲解,使用场景很少。

技术图片技术图片技术图片


 四、消息持久化

  RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑。但相应会带来性能的下降,大致10倍。消息队列持久化包括以下3个部分:

    (1)exchange持久化,在声明时指定durable => 1

    (2)queue持久化,在声明时指定durable => 1

    (3)消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

  如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。


五、消息确认和发送方确认

  1、消息确认:

    消费者收到的每一条消息都必须进行确认,分为自动确认和消费者自行确认。

    消费者在声明队列时,指定autoAck参数,true自动确认,false时rabbitmq会等到消费者显示的发回一个ack信号才会删除消息。autoAck=false,有足够时间让消费者处理消息,直到消费者显示调用basicAck为止。

    Rabbitmq中消息分为了两部分:1、等待投递的消息;2、已经投递,但是还没有收到ack信号的。如果消费者断连了,服务器会把消息重新入队,投递给下一个消费者。未ack的消息是没有超时时间的。

  2、发送方确认

    生产者不知道消息是否真正到达RabbitMq,也就是说发布操作不返回任何消息给生产者。

    AMQP协议层面为我们提供的事务机制解决了这个问题,但是事务机制本身也会带来问题:

       1、严重的性能问题

       2、使生产者应用程序产生同步

     RabbitMQ团队为我们拿出了更好的方案,即采用发送方确认模式,该模式比事务更轻量,性 能影响几乎可以忽略不计—— 发送方确认模式的机制。

  注意:发送方确认模式和消费者对消息的确认是不同的。

技术图片
 1 package producer.confirm;
 2 
 3 import com.rabbitmq.client.*;
 4 import java.io.IOException;
 5 import java.nio.charset.Charset;
 6 import java.util.concurrent.TimeoutException;
 7 
 8 /**
 9  * 发送方确认模式——异步模式
10  */
11 public class ProducerConfirmAsync {
12     private final static String EXCHANGE_NAME = "confirm_log";
13 
14     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
15         ConnectionFactory cf = new ConnectionFactory();
16         Connection con = cf.newConnection();
17         // 监听连接关闭事件,一般用于重连机制
18         con.addShutdownListener(new ShutdownListener() {
19             @Override
20             public void shutdownCompleted(ShutdownSignalException cause) {
21 
22             }
23         });
24         Channel channel = con.createChannel();
25         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
26         // 将信道设置为发送方确认
27         channel.confirmSelect();
28         // 监听信道事件
29         channel.addShutdownListener(new ShutdownListener() {
30             @Override
31             public void shutdownCompleted(ShutdownSignalException cause) {
32 
33             }
34         });
35         // deliveryTag:代表消息在信道中的唯一一次投递,单调递增
36         // multiple:是否批处理,默认false
37         // 监听已被投递的消息
38         channel.addConfirmListener(new ConfirmListener() {
39             @Override
40             public void handleAck(long deliveryTag, boolean multiple) {
41                 System.out.println("Ack deliveryTag=" + deliveryTag + ",multiple=" + multiple);
42             }
43 
44             @Override
45             public void handleNack(long deliveryTag, boolean multiple) {
46                 System.out.println("Ack deliveryTag=" + deliveryTag + ",multiple=" + multiple);
47             }
48         });
49 
50         // 监听未被投递到队列的消息
51         channel.addReturnListener(new ReturnListener() {
52             @Override
53             public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
54                                      AMQP.BasicProperties properties, byte[] body) {
55                 System.out.println("replyCode:" + replyCode);
56                 System.out.println("replyText:" + replyText);
57                 System.out.println("exchange:" + exchange);
58                 System.out.println("routingKey:" + routingKey);
59                 System.out.println("message:" + new String(body));
60             }
61         });
62         String[] logLevels = {"error", "info", "warning"};
63         System.out.println("waiting message sent...");
64         // mandatory参数为true,当投递消息无法找到合适的消息队列,则返回生成者;false为缺省,丢弃消息
65         for (int i = 0; i < 3; i++) {
66             String message =  " hello rabbitMq" + (i + 1);
67             channel.basicPublish(EXCHANGE_NAME, logLevels[i], true, null,
68                     message.getBytes(Charset.defaultCharset()));
69             System.out.println("---------------------------------------------------");
70             System.out.println("sent message: [" +logLevels[i] +"]" + message);
71             Thread.sleep(200);
72         }
73     }
74 }
发送方确认——异步模式

 

面试环节可能会问到几个基础问题:

  1.  如果消息达到无人订阅的队列会怎么办

   消息会一直在队列中等待,rabbitmq会默认队列是无限长度的。

  2.  多个消费者订阅到同一队列怎么办

  消息会轮询的方式发送给消费者,每个消息只会发送给一个消费者

  3.  消息路由到了不存在的队列怎么办?

  会忽略,当消息不存在,消息丢失了。

  4.  如何明确拒绝消息?

   1、消费者断连,2、消费者使用reject命令(requeue=true,重新分发消息,false移除消息),3、nack命令(批量的拒绝)

 

      

 

 

 

以上是关于RabbitMQ基础篇的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ基础篇

RabbitMQ基础篇(下载安装并基础使用,内含各种坑问题)

分布式系统消息中间件——RabbitMQ的使用思考篇

分布式系统消息中间件——RabbitMQ的使用基础篇

RabbitMq (一)理论篇部分 MQ作用是什么 MQ的优缺点 RabbitMQ的基础架构 RabbitMQ 五种常用工作模式 RabbitMQ消息确认机制

Java架构师成长之道之RabbitMQ开发与运维-基础篇(CSDN版)