总结rabbitmq
Posted muacheng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了总结rabbitmq相关的知识,希望对你有一定的参考价值。
一.rabbitmq基础
1.简介
RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的。Erlang语言在数据交互方面性能优秀,有着和原生Socket一样的延迟,这也是RabbitMQ高性能的原因所在
2.典型应用场景
(1)异步处理。把消息放入消息中间件中,等到需要的时候再去处理。
(2)流量削峰。例如秒杀活动,在短时间内访问量急剧增加,使用消息队列,当消息队列满了就拒绝响应,跳转到错误页面,这样就可以使得系统不会因为超负载而崩溃。
(3)日志处理
(4)应用解耦。假设某个服务A需要给许多个服务(B、C、D)发送消息,当某个服务(例如B)不需要发送消息了,服务A需要改代码再次部署;当新加入一个服务(服务E)需要服务A的消息的时候,也需要改代码重新部署;另外服务A也要考虑其他服务挂掉,没有收到消息怎么办?要不要重新发送呢?是不是很麻烦,使用MQ发布订阅模式,服务A只生产消息发送到MQ,B、C、D从MQ中读取消息,需要A的消息就订阅,不需要了就取消订阅,服务A不再操心其他的事情,使用这种方式可以降低服务或者系统之间的耦合。
3.AMQP中几个重要概念
(1)Server:接收客户端的连接,实现AMQP实体服务。
(2)Connection:连接,应用程序与Server的网络连接,TCP连接。
(3)Channel:信道,消息读写等操作在信道中进行。客户端可以建立多个信道,每个信道代表一个会话任务。
(4)Message:消息,应用程序和服务器之间传送的数据,消息可以非常简单,也可以很复杂。有Properties和Body组成。Properties为外包装,可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body就是消息体内容。
(5)Virtual Host:虚拟主机,用于逻辑隔离。一个虚拟主机里面可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名称的Exchange或Queue。
(6)Exchange:交换器,接收消息,按照路由规则将消息路由到一个或者多个队列。如果路由不到,或者返回给生产者,或者直接丢弃。RabbitMQ常用的交换器常用类型有direct、topic、fanout、headers四种,后面详细介绍。
(7)Binding:绑定,交换器和消息队列之间的虚拟连接,绑定中可以包含一个或者多个RoutingKey。
(8)RoutingKey:路由键,生产者将消息发送给交换器的时候,会发送一个RoutingKey,用来指定路由规则,这样交换器就知道把消息发送到哪个队列。路由键通常为一个“.”分割的字符串,例如“com.rabbitmq”。
(9)Queue:消息队列,用来保存消息,供消费者消费
4.AMQP协议和RabbitMQ
AMQP协议模型有三部分组成:生产者、消费者和服务端。
生产者是投递消息的一方,首先连接到Server,建立一个连接,开启一个信道;然后生产者声明交换器和队列,设置相关属性,并通过路由键将交换器和队列进行绑定。同理,消费者也需要进行建立连接,开启信道等操作,便于接收消息。
接着生产者就可以发送消息,发送到服务端中的虚拟主机,虚拟主机中的交换器根据路由键选择路由规则,然后发送到不同的消息队列中,这样订阅了消息队列的消费者就可以获取到消息,进行消费。
最后还要关闭信道和连接。
RabbitMQ是基于AMQP协议实现的,其结构如下图所示,和AMQP协议简直就是一模一样
1.消息基于什么传输
由于TCP连接的创建和销毁开销较大,且并发数受系统资源限制,会造成性能瓶颈。RabbitMQ使用信道的方式来传输数据。信道是建立在真实的TCP连接内的虚拟连接,且每条TCP连接上的信道数量没有限制
2.如何保证幂等?
唯一ID+指纹码机制,利用数据库主键去重
将ID + 指纹码拼接好的值作为数据库主键,就可以进行去重了。即在消费消息前呢,先去数据库查询这条消息的指纹码标识是否存在,没有就执行insert操作,如果有就代表已经被消费了,就不需要管了
5.常用交换机(Exchange)
RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种
(1)Direct Exchange
该类型的交换器将所有发送到该交换器的消息被转发到RoutingKey指定的队列中,也就是说路由到BindingKey和RoutingKey完全匹配的队列中(如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog)
Channel channel = connection.createChannel();
channel.exchangeDeclare("exchangeName", "direct"); //direct fanout topic
channel.queueDeclare("queueName");
channel.queueBind("queueName", "exchangeName", "routingKey");
byte[] messageBodyBytes = "hello world".getBytes();
//需要绑定路由键
channel.basicPublish("exchangeName", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
(2)Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的
(3)Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.” 只会匹配到“audit.irs”
Channel channel = connection.createChannel();
channel.exchangeDeclare("exchangeName", "topic"); //direct fanout topic
channel.queueDeclare("queueName");
channel.queueBind("queueName", "exchangeName", "routingKey.*");
byte[] messageBodyBytes = "hello world".getBytes();
channel.basicPublish("exchangeName", "routingKey.one", MessageProperties.PERSISTENT_TEXT_PLAIN, messageBodyBytes);
二.高级特性
1.过期时间(TTL)
Time To Live,也就是生存时间,是一条消息在队列中的最大存活时间,单位是毫秒。了解Redis的朋友应该一看就明白,二者很像。
RabbitMQ可以对消息和队列设置TTL。
RabbitMQ支持设置消息的过期时间,在消息发送的时候可以进行指定,每条消息的过期时间可以不同。
RabbitMQ支持设置队列的过期时间,从消息入队列开始计算,直到超过了队列的超时时间配置,那么消息会变成死信,自动清除。
如果两种方式一起使用,则过期时间以两者中较小的那个数值为准。
当然也可以不设置TTL,不设置表示消息不会过期;如果设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息将被立即丢弃
2.消息确认
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制。消费者订阅队列的时候,可以指定autoAck参数,当autoAck为true的时候,RabbitMQ采用自动确认模式,RabbitMQ自动把发送出去的消息设置为确认,然后从内存或者硬盘中删除,而不管消费者是否真正消费到了这些消息。当autoAck为false的时候,RabbitMQ会等待消费者回复的确认信号,收到确认信号之后才从内存或者磁盘中删除消息。
消息确认机制是RabbitMQ消息可靠性投递的基础,只要设置autoAck参数为false,消费者就有足够的时间处理消息,不用担心处理消息的过程中消费者进程挂掉后消息丢失的问题。
3.持久化
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢?答案就是消息持久化。持久化可以防止在异常情况下丢失数据。RabbitMQ的持久化分为三个部分:交换器持久化、队列持久化和消息的持久化。
(1)exchange的持久化对消息的可靠性来说没有什么影响,但是同样如果exchange不设置持久化,那么当broker服务重启之后,exchange将不复存在,那么既而发送方rabbitmq producer就无法正常发送消息。
(2)queue的持久化标识durable设置为true,则代表是一个持久的队列,那么在服务重启之后,也会存在,因为服务会把持久化的queue存放在硬盘上,当服务重启的时候,会重新读取之前被持久化的queue。队列是可以被持久化,但是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,重启之前那个queue里面还没有发出去的消息的话,重启之后那队列里面是不是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。
(3)将交换器、队列和消息都设置持久化之后就能保证数据不会被丢失吗?
①消费者端: 消费者订阅队列将autoAck设置为true,虽然消费者接收到了消息,但是没有来得及处理就宕机了,那数据也会丢失,解决方案就是以为手动确认接收消息,待处理完消息之后,手动删除消息
②在rabbitmq服务端,如果消息正确被发送,但是rabbitmq未来得及持久化,没有将数据写入磁盘,服务异常而导致数据丢失,解决方案,可以通过rabbitmq集群的方式实现消息中间件的高可用性
4.死信队列
(1)死信
消息被拒绝签收(Nack),并且不允许重回队列。
TTL设定的消息有效时间过期。
实际消息数大于队列最大限制数,那么超出最大限制的消息都将会是死信。
(2)死信队列
“死信队列”,顾名思义,就是存放死信的队列。其实它和普通的队列并没有太大差别,唯一的区别就是他的routingkey是"#"。也就是说:只要你路由到我这个死信队列,我都接收
5.延迟队列
一般的队列,消息一旦进入队列就会被消费者立即消费。延迟队列就是进入该队列的消息会被消费者延迟消费,延迟队列中存储的对象是的延迟消息,“延迟消息”是指当消息被发送以后,等待特定的时间后,消费者才能拿到这个消息进行消费。
延迟队列用于需要延迟工作的场景。最常见的使用场景:淘宝或者天猫我们都使用过,用户在下单之后通常有30分钟的时间进行支付,如果这30分钟之内没有支付成功,那么订单就会自动取消。
以上是关于总结rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章