RabbitMQ

Posted 邱秋Elena

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ相关的知识,希望对你有一定的参考价值。

文章目录


问题现象:

1、 海量数据

2、 高并发

解决方案:其中最为代表解决方案流量削峰-》 使用MQ


提示:以下是本篇文章正文内容,下面案例可供参考

1、 MQ概念

1.1 MQ 介绍

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。

1.2 MQ应用场景

MQ的优势

1.2.1异步解耦

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合 调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在 这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流 系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。

1.2.2 削峰填谷

举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正 常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限 制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分 散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体 验要好。

1.2.3 消息分发

在实际开发中一个系统的数据有的时候需要分发个不同的系统中, 拿电商举例在双11的时候有很多会场,每一个会场可能都需要用到一个商品的数据,那么我们需要把数据分发到不同的会场中,假设有加了一个会场我们还需要把数据分发给新的会场

MQ的劣势

系统可用性降低

系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

系统复杂度提高

MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

一致性问题

A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

1.3 常见的消息中间件

目前市场上主流的消息中间件主要有ActivitiMQ、RabbitMQ、RocketMQ、kafka

ActivitiMQ:ActiveMQ是Apache出品,比较老的一个开源的消息中间件, 是一个完全支持JMS规范的消息中间件.API丰富,以前在中小企业应用广泛

MQ衡量的指标:服务性能,数据存储,集群架构

RabbitMQ: RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

对数据的一致性,稳定性和可靠性要求比较高的场景

RocketMQ: RocketMQ 是阿里巴巴在 2012 年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于 2017 年 9 月 25 日成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。

**Kafka: **Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

常见消息中间件对比图

1.4 RabbitMQ简介

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议 的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中 间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。 Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

1.5 RabbitMQ中的核心概念

Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker

Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网 络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等

Connection:publisher/consumer 和 broker 之间的 TCP 连接

Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线 程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销

Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)

Queue:消息最终被送到这里等待 consumer 取走

Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存 到 exchange 中的查询表中,用于 message 的分发依据

RabbitMQ架构图

2、RabbitMQ的安装与配置

步骤1 :下载和运行

https://www.rabbitmq.com/

安装文档参考:https://www.rabbitmq.com/download.html

对于rabbitmq的安装方式有很多种, 我们这里使用通过yum命令的方式进行安装, 因为rabbitmq是使用Erlang语言进行编写的, 所以我们在安装的时候需要先搭建Erlang的一个运行环境

步骤2 :安装Erlang

我们安装的rabbitmq的版本是3.7的,要求使用的Erlang版本必须是20.3.x的版本,我们可以去github下载对应的Erlang的对应版本https://github.com/rabbitmq/erlang-rpm

vi /etc/yum.repos.d/rabbitmq-erlang.repo

Erlang 20.3.x

# In /etc/yum.repos.d/rabbitmq-erlang.repo

[rabbitmq-erlang]
name=rabbitmq-erlang
baseurl=https://dl.bintray.com/rabbitmq-erlang/rpm/erlang/20/el/7
gpgcheck=1
gpgkey=https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
repo_gpgcheck=0
enabled=1

安装命令

yum install erlang -y

步骤3 : 安装rabbitmq-server

导入对应的验证签名:

rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc
创建一个rabbitmq的仓库文件:vi /etc/yum.repo.d/rabbitmq.repo
[bintray-rabbitmq-server]
name=bintray-rabbitmq-rpm
baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.7.x/el/7/
gpgcheck=0
repo_gpgcheck=0
enabled=1
安装命令

yum install rabbitmq-server -y

步骤四:开启服务

添加rabbitmq-server开机启动
chkconfig rabbitmq-server on
启动服务:
service rabbitmq-server start
停止服务
service rabbitmq-server stop
开启管理页面的插件
rabbitmq-plugins enable rabbitmq_management
rabbitmq-plugins 查看页面信息
rabbitmq-plugins list

步骤五:开启guest用户的远程访问

 vi /etc/rabbitmq/rabbitmq.config
	[rabbit,[loopback_users,[]]]

3、RabbitMQ的快速入门

RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

官网: https://www.rabbitmq.com/getstarted.html

入门需求: 利用生产者发送消息到MQ ,消费者消费消息

操作步骤:

  • 创建生产者工程和消费者工程
  • 添加依赖
<dependencies>
        <dependency>
           <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
</dependencies>
  • 编写生产者
package cn.wolfcode.java.rabbitmq._01helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * Created by wolfcode-fanjialong
 */
public class Send 
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.142.129");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) 
           /**
             * 声明队列
             * 第一个参数queue:队列名称
             * 第二个参数durable:是否持久化
             * 第三个参数Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次
声明它的连接可见,并在连接断开时自动删除。
             *     这里需要注意三点:
             *         1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个
连接创建的排他队列的。
             *         2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建
立同名的排他队列的,这个与普通队列不同。
             *         3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会
被自动删除的。
             *         这种队列适用于只限于一个客户端发送读取消息的应用场景。
             * 第四个参数Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列
会被自动删除。
             *                         这种队列适用于临时队列。
             */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        
    

  • 编写消费者
package cn.wolfcode.java.rabbitmq._01helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
 * Created by wolfcode-fanjialong
 */
public class Recv 
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.142.129");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> 
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        ;
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag ->  );

    


4、RabbitMQ的常见工作模式

4.1 Work queues 工作队列模式

4.1.1 模式说明

通过Helloworld工程我们已经能够构建一个简单的消息队列的基本项目,项目中存在几个角色:生产 者、消费者、队列,而对于我们真实的开发中 ,对于消息的消费者通过是有多个的。

比如在实现用户注册功能时,用户注册成功,会给响对应用户发送邮件,同时给用户发送手机短信,告诉用户已成功注册网站或者app 应用,这种功能在大部分项目开发中都比较常见 ,而对于helloworld 的应用中虽然能够对 消息进行消费,但是有很大问题: 消息消费者只有一个,当消息量非常大时,单个消费者处理消息就会变得很慢,同时给节点页带来很大压力,导致消息堆积越来越多。对于这种情况,RabbitMQ 提供了工作 队列模式,通过工作队列提供做个消费者,对MQ产生的消息进行消费,提高MQ消息的吞吐率,降低*消息的处理时间。处理模型图如下。

4.1.2 实现步骤

生产者

package cn.wolfcode.java.rabbitmq._02worker;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

/**
 * Created by wolfcode-fanjialong
 */

public class NewTask 
    private static final String TASK_QUEUE_NAME = "task_queue";
    public static void main(String[] argv) throws Exception 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.142.129");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) 
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            String message = "tasequeue";
            for(int i=0;i<20;i++)
                channel.basicPublish("", TASK_QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,

                        (message+i).getBytes("UTF-8"));

            
            System.out.println(" [x] Sent '" + message + "'");
        
    

消费者

package cn.wolfcode.java.rabbitmq._02worker;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.util.concurrent.TimeUnit;

/**

 * Created by wolfcode-fanjialong

 */

public class Worker 

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.142.129");
        final Connection connection = factory.newConnection;
        final Channel channel = connection.createChannel();
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //channel.basicQos(1);
        DeliverCallback deliverCallback = (consumerTag, delivery) -> 
            String message = new String(delivery.getBody(), "UTF-8");
            try 
                TimeUnit.SECONDS.sleep(1);
             catch (InterruptedException e) 
                e.printStackTrace();
            finally 
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            

            System.out.println(" [x] Received '" + message + "'");
        ;

        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag ->  );
    


4.1.3 简单问题说明

从结果可以看出消息被平均分配到两个消费方,来对消息进行处理,提高了消息处理效率,创建多个消费者来对消息进行处理。这里RabitMQ采用轮询来对消息进行分发时保证了消息被平均分配到每个消费方 。

但是引入新的问题:真正的生产环境下,对于消息的处理基本不会像我们现在看到的这样,每个消 费方处理的消息数量是平均分配的,比如因为网络原因,机器cpu ,内存等硬问题,消费方处理消息时 同类消息不同机器进行处理时消耗时间也是不一样的,比如1号消费者消费1条消息时1秒,2号消费者消费1条消息是5秒,对于1号消费者比2号消费者处理消息快,那么在分配消息时就应该让1号消费者多收 到消息进行处理,也即是我们通常所说的”能者多劳”,同样Rabbitmq对于这种消息分配模式提供了支持。

问题: 任务量很大,消息虽然得到了及时的消费,单位时间内消息处理速度加快,提高了吞吐量,可 是不同消费者处理消息的时间不同,导致部分消费者的资源被浪费。

解决:采用消息公平分发。

总结:工作队列消息轮询分发消费者收到的消息数量平均分配,单位时间内消息处理速度加快,提高了吞吐量。

4.4 工作模式队列-消息公平分发(fair dispatch)

在案例01中对于消息分发采用的是默认轮询分发,消息应答采用的自动应答模式,这是因为当消息进 入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将第n条消息发给第n个消费者。

为了解决这个问题,我们使用 basicQos(prefetchCount = 1) 方法,来限RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。执行模型图如下:

4.2 Pub/Sub 订阅模式

4.2.1. 模式说明

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

C:消费者,消息的接收者,会一直等待消息到来

Queue:消息队列,接收消息、缓存消息

Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、

递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

Fanout:广播,将消息交给所有绑定到交换机的队列

Direct:定向,把消息交给符合指定routing key 的队列

Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

4.2.2 实现步骤

生产者

package cn.wolfcode.java.rabbitmq._03pubsub;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * Created by wolfcode-fanjialong
 */
public class EmitLog 
    private static final String EXCHANGE_NAME = "logs";
    public static void main(String[] argv) throws Exception 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()) 
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String message = "info: Hello World!";
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        
    

消费者

package cn.wolfcode.java.rabbitmq._03pubsub;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * Created by wolfcode-fanjialong
 */

public class ReceiveLogs 
    private static final String EXCHANGE_NAME = "logs";
    public static void main(String[] argv) throws Exception 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String queueName = channel.queueDeclare().以上是关于RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

inconsistent_cluster,“Node ‘rabbit@node-1‘ thinks it‘s clustered with node ‘rabbit@node-3‘, but ‘rab

inconsistent_cluster,“Node ‘rabbit@node-1‘ thinks it‘s clustered with node ‘rabbit@node-3‘, but ‘rab

5G核心网技术基础自学系列 | MR-DC:签约QoS流E-RAB和MR-DC承载

类型模式名称:R a b = Q (a -> (R a b,b))

使用数字电位器AD5254调节电源

LTE基础知识承载相关概念介绍