rabbitmq
Posted code home
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq相关的知识,希望对你有一定的参考价值。
1,集群消费模式,针对每条消息,同一个Consumer ID内的只会有1个实例拉取并处理消息,3比如发送3条保单消息message1、message2、message3,以其中一个下游保单系统为例,该系统有3个应用实例,在初始化基础数据时,MQS开发人员会给该系统分配一个Consumer ID并订阅该topic,
集群消费模式,针对每条消息,同一个Consumer ID内的只会有1个实例拉取并处理消息。
解决方案:各保单系统采用集群消费模式接入MQS订阅产险出单系统的保单topic, 每个保单系统分别申请独立的ConsumerID订阅该topic,可实现每条消息只会被每个保单系统集群实例中其中1个实例拉取并处理。
2,广播消费模式消费消息,则这两条消息会被投递到该微服务的每一台实例
某个配置文件有2条变更消息Message1、Message2,其中1个微服务需要获取该配置文件的变更信息,该微服务有3个实例,MQS会给这个微服务分配独立的Consumer ID:Consumer ID2,
采用广播消费模式消费消息,则这两条消息会被投递到该微服务的每一台实例。
解决方案:业务系统接入MQS,采用广播消费的模式订阅配置文件topic,一旦某个微服务的配置文件有更新时,业务系统会给该topic发送通知消息,因为是广播消费订阅了此topic,所以该微服务的每个主机都会接收到这条消息。
3,消息过滤
解决方案:MQS支持消息过滤,PNBS在发送每条消息时,都给消息设置标签TAG,所有下游的保单系统只需设置相应的TAG并订阅这个topic,就能只消费自己想要的数据,MQS支持消费端设置多个TAG。
rabbitMQ 集群
单机多实例
RABBITMQ_NODE_PORT=5673
RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener
[{port,15673}] -rabbitmq_stomp tcp_listeners [61614] -rabbitmq_mqtt
tcp_listeners [1884]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server
-detached
rabbitmqctl -n rabbit2 stop_app 关闭应用(不同于结束)
rabbitmqctl -n rabbit2 reset
rabbitmqctl -n rabbit2 cluster 或 rabbitmqctl -n rabbit2 join_cluster rabbit@localhost
rabbitmqctl -n rabbit2 start_app
rabbitmqctl -n rabbit2 cluster_status
rabbitmq-plugins -n rabbit2 enable rabbitmq_management 开启rabbit2的监控
rabbitmq-plugins -n rabbit2 disable rabbitmq_management 关闭rabbit2的监控
rabbitmq-plugins enable rabbitmq_management 开启rabbit的监控
rabbitmq-plugins disable rabbitmq_management 关闭rabbit的监控
rabbit@localhost
rabbitmqctl -n rabbit2 change_cluster_node_type ram 把rabbit2设置为ram内存模式
rabbitmqctl -n rabbit2 stop 结束命令
配置RabbitMQ cluster,(截图全部为单机多实例实验)
1、先保证你的各个rabbitmq节点都是可以访问的,且打开rabbitmq_management plugin,这样可以当出现某个节点挂掉之后可以切换到其他管理界面查看情况或者管理,保证两个节点都是可以正常工作的。下面我们就将这两个节点连接起来形成高可用的cluster,这样我们就可以让我们的exchange、queue在这两个节点之间复制,形成高可用的queue
2、cd 到你的home目录下,我是在root下,里面有一个隐藏的.erlang.cookie文件,/var/lib/rabbitmq,这就是我在前面介绍erlang时候提到的,这个文件是erlang用来发现和互连的基础。我们需要做的很简单,将两个节点中的.erlang.cookie设置成一样的。这是erlang的约定,一样的cookie hash key他认为是合法和正确的连接,.erlang.cookie默认是只读的,你需要修改下写入权限,然后复制粘贴下cookie 字符串即可。chmod u+w .erlang.cookie
3、配置好了之后接下来配置hosts文件,erlang会使用hosts文件里的配置去发现节点。
vim /etc/hosts
192.168.0.107 rabbitmq_node2
192.168.0.105 rabbitmq_node1
保证同样的配置在所有的节点上都是相同的。验证你配置的正确不正确你只需要在你的机器上ping rabbitmq_node1,试下请求的ip是不是你配置的即可。按照DNS的请求原理,hosts是最高优先权,除非浏览器有缓存,你直接用ping就不会有问题的
4、选择一个节点stop,然后连接到另外节点。
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq_node2
Clustering node rabbit@rabbitmq_node1 with rabbit@rabbitmq_node2 ...
rabbitmqctl start_app
节点已经连接成功,如图
5、默认情况下节点占用的memory是总内存的40%,可以根据自己的用途仔细研究rabbitmq的配置项。为了提高性能,不需要两个节点都是disc的节点,所以我们需要启动一个节点为RAM模式
rabbitmqctl change_cluster_node_type ram
rabbitmqctl -n rabbit2 change_cluster_node_type ram(单机集群) 改变rabbitmq_node1为内存节点模式,如图
6、Mirror queue policy 设置
节点是准备好了,接下来我们需要设置exchange、queue 高可用策略,这样才能真的做到高可用。现在是物理上的机器或者说虚拟机节点是高可用的,但是里面的对象需要我们进行配置策略。
RabbitMQ支持很好的策略模式,需要管理员才能操作。
首先我们需要创建一个属于自己业务范围内的vhost,标示一个逻辑上的独立空间,所有的账号、策略、队列都是强制在某个虚拟机里的。我创建了一个common vhost。(使用系统默认虚拟机也行,新建虚拟机如果没用户后续操作不行)
在虚拟机里添加用户
开始添加 policy ,如图
all 所有的节点都将被同步
exactly 指定个数的节点被同步
nodes 指定的名称的节点被同步
最主要是Apply to ,可以作用在exchange或者queues上,当然也可以包含这两个。策略选择还是比较丰富的,最常用的是HAmode,还有MessageTTL(消息的过期时间)。这些策略按照几个维度分组了,有跟高可用相关的,有Federation(集群之间同步消息)相关的 ,有Queue相关的,还有Exchange相关的。可以根据的业务场景进行调整。我们定义了策略的匹配模式.order.,这样可以避免将所有的exchange、queue都镜像了。
7、我们新建了一个ex.order.topic exchange,它的features中应用了exchange_queue_ha策略。(相同的策略是无法叠加使用的。)其他的exchange并没有应用这个策略,是因为我们的pattern限定了只匹配.order.的exchange。如图
8、创建一个qu.order.crm queue,注意看它的node属性里有一个”Synchronised mirrors:rabbit@rabbitmq_node2“镜像复制。features里也应用了exchange_queue_ha策略。这个时候,队列其实在两个节点里都是有的,虽然我们创建的时候是在rabbit@rabbitmq_node1里的,但是它会复制到集群里的其他节点。在创建HAmode的时候可以提供HA params参数,来限定复制节点的个数,这通常用来提高性能和HA之间的平衡。
添加队列时有一个node选项,在rabbit2添加的会同步到rabbit上 synchronised mirrors :rabbit@localhost
rabbit@localhost为建立集群时的主节点
9、默认的,也就是什么也不配置,直接在某个节点中添加一个queue,那么它仅仅是属于这个节点的。其它节点有的只是它的影子。所以像断线重连、操作恢复是无法做到的,实验证明确实是这样的。声明queue的节点关闭那么是无法再进行发布消息与消费的。这自然失去了集群的意义,如xmh.orde.ssx
如果用镜像复制,就会在从节点复制该queue,当主节点下线后从节点仍可使用,主节点再次上线后,从节点会把queue同步到主节点,此时xmh.order.ssx的node为rabbit2@localhost +0 +1
注意:
RabbitMQ集群中节点包括内存节点、磁盘节点。内存节点就是将所有数据放在内存,磁盘节点将数据放在磁盘上。如果在投递消息时,打开了消息的持久化,那么即使是内存节点,数据还是安全的放在磁盘。
一个rabbitmq集群中可以共享 user,vhost,exchange等,所有的数据和状态都是必须在所有节点上复制的,对于queue根据集群模式不同,应该有不同的表现
普通模式:默认的集群模式。
默认的集群模式,queue创建之后,如果没有其它policy,则queue就会按照普通模式集群。对于Queue来说,消息实体只存在于其中一个节点,A、B两个节点仅有相同的元数据,即队列结构,但队列的元数据仅保存有一份,即创建该队列的rabbitmq节点(A节点),当A节点宕机,你可以去其B节点查看,./rabbitmqctl list_queues 发现该队列已经丢失,但声明的exchange还存在。
当消息进入A节点的Queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并经过B发送给consumer,所以consumer应平均连接每一个节点,从中取消息。
该模式存在一个问题就是当A节点故障后,B节点无法取到A节点中还未消费的消息实体。
如果做了队列持久化或消息持久化,那么得等A节点恢复,然后才可被消费,并且在A节点恢复之前其它节点不能再创建A节点已经创建过的持久队列;如果没有持久化的话,消息就会失丢。
这种模式更适合非持久化队列,只有该队列是非持久的,客户端才能重新连接到集群里的其他节点,并重新创建队列。假如该队列是持久化的,那么唯一办法是将故障节点恢复起来。
为什么RabbitMQ不将队列复制到集群里每个节点呢?这与它的集群的设计本意相冲突,集群的设计目的就是增加更多节点时,能线性的增加性能(CPU、内存)和容量(内存、磁盘),当然RabbitMQ新版本集群也支持队列复制(有个选项可以配置)。比如在有五个节点的集群里,可以指定某个队列的内容在2个节点上进行存储,从而在性能与高可用性之间取得一个平衡(应该就是指镜像模式)。
镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案。
该模式解决了上述问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在consumer取数据时临时拉取。
该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。
所以在对可靠性要求较高的场合中适用,一个队列想做成镜像队列,需要先设置policy,然后客户端创建队列的时候,rabbitmq集群根据“队列名称”自动设置是普通集群模式或镜像队列。具体如下:
队列通过策略来使能镜像。策略能在任何时刻改变,rabbitmq队列也近可能的将队列随着策略变化而变化;非镜像队列和镜像队列之间是有区别的,前者缺乏额外的镜像基础设施,没有任何slave,因此会运行得更快。
为了使队列称为镜像队列,你将会创建一个策略来匹配队列,设置策略有两个键“ha-mode和 ha-params(可选)”。ha-params根据ha-mode设置不同的值,下面表格说明这些key的选项
如果使用ram方式,RabbitMQ能够承载的访问量则取决于可用的内存数了。RabbitMQ使用两个参数来限制使用系统的内存,避免系统被自己独占。
Java代码
[{rabbit, [{vm_memory_high_watermark_paging_ratio, 0.75}, {vm_memory_high_watermark, 0.4}]}].
vm_memory_high_watermark:表示RabbitMQ使用内存的上限为系统内存的40%。也可以通过absolute参数制定具体可用的内存数。当RabbitMQ使用内存超过这个限制时,RabbitMQ
将对消息的发布者进行限流,直到内存占用回到正常值以内。
Vm_memory_high_watermark_paging_ratio:表示当RabbitMQ达到0.4*0.75=30%,系统将对queue中的内容启用paging机制,将message等内容换页到disk 中。
RabbitMQ的内存使用情况可以通过“rabbitmqctl status”或者管理插件中的Web UI查询。
各个内存条目的含义请参照:https://www.rabbitmq.com/memory-use.html
当消息发送的速率超过了RabbitMQ的处理能力时该怎么办?
RabbitMQ会自动减慢这个连接的速率,让client端以为网络带宽变小了,发送消息的速率会受限,从而达到流控的目的。
使用”rabbitmqctl list_connections”查看连接,如果状态为“flow”,则说明这个连接处于flow-control
状态。
集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐能力的目的。
1 集群配置方式
RabbitMQ可以通过三种方法来部署分布式集群系统,分别是:cluster,federation,shovel
cluster:
不支持跨网段,用于同一个网段内的局域网
可以随意的动态增加或者减少
节点之间需要运行相同版本的RabbitMQ和Erlang
federation:应用于广域网,允许单台服务器上的交换机或队列接收发布到另一台服务器上交换机或队列的消息,可以是单独机器或集群。federation队列类似于单向点对点连接,消息会在联盟队列之间转发任意次,直到被消费者接受。通常使用federation来连接internet上的中间服务器,用作订阅分发消息或工作队列。
shovel:连接方式与federation的连接方式类似,但它工作在更低层次。可以应用于广域网。
以上是关于rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章
RabbitMQRabbitMQ和Erlang下载与安装步骤—2023超详细最新版