安装RabbitMQ(Centos6)(入门使用教程)(消息丢失的解决方案)以及Spring AMQP的使用
Posted 蓝盒子itbluebox
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了安装RabbitMQ(Centos6)(入门使用教程)(消息丢失的解决方案)以及Spring AMQP的使用相关的知识,希望对你有一定的参考价值。
一、RabbitMQ的安装和配置以及用户管理
1、搜索与商品服务的问题
目前我们已经完成了商品详情和搜索系统的开发。我们思考一下,是否存在问题?
- 商品的原始数据保存在数据库中,增删改查都在数据库中完成。
- 搜索服务数据来源是索引库,如果数据库商品发生变化,索引库数据不能及时更新。
- 商品详情做了页面静态化,静态页面数据也不会随着数据库商品发生变化。
如果我们在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的价格,这样显然不对。该如何解决?
这里有两种解决方案:
-
方案1:每当后台对商品做增删改操作,同时要修改索引库数据及静态页面
-
方案2:搜索服务和商品页面服务对外提供操作接口,后台在商品增删改后,调用接口
以上两种方式都有同一个严重问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立
原则。
所以,我们会通过另外一种方式来解决这个问题:消息队列
2、消息队列(MQ)
(1)什么是消息队列
消息队列,即MQ,Message Queue。
(2)AMQP和JMS
MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。
两者间的区别和联系:
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模型;而AMQP的消息模型更加丰富
(3)常见MQ产品
-
ActiveMQ:基于
JMS
-
RabbitMQ:基于
AMQP
协议,erlang
语言开发,稳定性好 -
RocketMQ:基于
JMS
,阿里巴巴产品,目前交由Apache基金会 -
Kafka:分布式消息系统,高吞吐量
(4)RabbitMQ
RabbitMQ是基于AMQP的一款消息管理系统
官方教程:http://www.rabbitmq.com/getstarted.html
RabbitMQ基于Erlang语言开发:
3、下载和安装
(1)下载
官网下载地址:http://www.rabbitmq.com/download.html
也可以使用我提供的软件:
https://download.csdn.net/download/qq_44757034/21713768
(2)安装Erlang
1)在/home/leyou/
下创建一个新目录
我们并没有提供Erlang安装包,直接采用yum仓库安装:在mq目录下
sudo yum install esl-erlang_17.3-1~centos~6_amd64.rpm
sudo yum install esl-erlang-compat-R14B-1.el6.noarch.rpm
(3)安装RabbitMQ
1)安装
sudo rpm -ivh rabbitmq-server-3.4.1-1.noarch.rpm --force --nodeps
2)修改配置文件
将配置文件模板复制到etc目录:
sudo cp /usr/share/doc/rabbitmq-server-3.4.1/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
通过vim命令编辑:
sudo vim /etc/rabbitmq/rabbitmq.config
修改下面内容:
注意要去掉前面的百分号和后面的逗号
(4) 设置开机启动
输入下面命令
sudo chkconfig rabbitmq-server on
(5) 启动命令
通过下面命令来控制RabbitMQ:
sudo service rabbitmq-server start
sudo service rabbitmq-server stop
sudo service rabbitmq-server restart
(6) 开启web管理界面
RabbitMQ提供了用来管理的控制界面,十分方便,不过默认是关闭的。
我们通过命令开启web管理插件:
sudo rabbitmq-plugins enable rabbitmq_management
然后重启RabbitMQ:
sudo service rabbitmq-server restart
(7)开放端口
RabbitMQ默认使用15672端口进行web访问,我们开启防火墙端口:
iptables -A INPUT -ptcp --dport 15672 -j ACCEPT
service iptables save
然后在主机中通过地址:http://192.168.206.66:15672即可访问到管理界面
4、管理界面介绍
第一次访问需要登录,默认的账号密码为:guest/guest
(1)主页
- connections:无论生产者还是消费者,都需要与RabbitMQ建立连接后才可以完成消息的生产和消费,在这里可以查看连接情况
- channels:通道,建立连接后,会形成通道,消息的投递获取依赖通道。
- Exchanges:交换机,用来实现消息的路由
- Queues:队列,即消息队列,消息存放在队列中,等待消费,消费后被移除队列。
修改密码
(2)添加用户
上面的Tags选项,其实是指定用户的角色,可选的有以下几个:
-
超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
-
监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
-
策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
-
普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
-
其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
(3)创建虚拟主机(Virtual Hosts)
为了让各个用户可以互不干扰的工作,RabbitMQ添加了虚拟主机(Virtual Hosts)的概念。其实就是一个独立的访问路径,不同用户使用不同路径,各自有自己的队列、交换机,互相不会影响。
创建好虚拟主机,我们还要给用户添加访问权限:
点击添加好的虚拟主机:
进入虚拟主机设置界面:
二、五种消息模型
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。
但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同。
1、导入demo工程
我们通过一个demo工程来了解下RabbitMQ的工作方式:
导入工程:下载代码
https://download.csdn.net/download/qq_44757034/21718711
2、基本消息模型
说明
官方文档说明:
RabbitMQ是一个消息的代理者(Message Broker):它接收消息并且传递消息。
你可以认为它是一个邮局:当你投递邮件到一个邮箱,你很肯定邮递员会终究会将邮件递交给你的收件人。与此类似,RabbitMQ 可以是一个邮箱、邮局、同时还有邮递员。
不同之处在于:RabbitMQ不是传递纸质邮件,而是二进制的数据
基本消息模型图:
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
生产者
连接工具类:
修改ip地址和对应的账号密码以及访问路径
public class ConnectionUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("192.168.206.66");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/leyou");
factory.setUsername("leyou");
factory.setPassword("123321");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
生产者发送消息:
package cn.itcast.rabbitmq.simple;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
/**
* 生产者
*/
public class Send {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道,使用通道才能完成消息相关的操作
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
// 向指定的队列中发送消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
}
}
控制台运行结果
web控制台查看消息(已创建的leyou用户登录,密码是123321)
进入队列页面,可以看到新建了一个队列:simple_queue
点击队列名称simple_queue,进入详情页,可以查看消息:
在控制台查看消息并不会将消息消费,所以消息还在
消费者获取消息
package cn.itcast.rabbitmq.simple;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
/**
* 消费者
*/
public class Recv {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [x] received : " + msg + "!");
}
};
// 监听队列,第二个参数:是否自动进行消息确认。
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
控制台
这个时候,队列中的消息就没了:
消费者的消息确认机制(Acknowlage)防止消息丢失
通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。
那么问题来了:RabbitMQ怎么知道消息被接收了呢?
这就要通过消息确认机制(Acknowlege)来实现了。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:
- 自动ACK:消息一旦被接收,消费者自动发送ACK(如果代码当中出现异常,代码没有执行完毕的情况下消息就丢失了,正常情况下代码没有执行完毕是不可能删除消息的)
- 手动ACK:消息接收后,不会发送ACK,需要手动调用
大家觉得哪种更好呢?
这需要看消息的重要性:
- 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
- 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
在Recv当中测试
先运行Send
在Recv当中添加错误代码
运行Recv
运行报错
我们之前的测试都是自动ACK的,如果要手动ACK,需要改动我们的代码:
package cn.itcast.rabbitmq.simple;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import cn.itcast.rabbitmq.util.ConnectionUtil;
/**
* 消费者,手动进行ACK
*/
public class Recv2 {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 创建通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息体
String msg = new String(body);
System.out.println(" [x] received : " + msg + "!");
// 手动进行ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 监听队列,第二个参数false,手动进行ACK
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
注意到最后一行代码:
channel.basicConsume(QUEUE_NAME, false, consumer);
如果第二个参数为true,则会自动进行ACK;如果为false,则需要手动ACK。
运行测试
先通过Send类
浏览器当中查看
在Recv2当中写一个错误代码
运行Recv2
报错
消息依旧在
3、work消息模型
说明
在刚才的基本模型中,一个生产者,一个消费者,生产的消息直接被消费者消费。比较简单。
Work queues,也被称为(Task queues),任务模型。
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
角色:
- P:生产者:任务的发布者
- C1:消费者,领取任务并且完成任务,假设完成速度较慢
- C2:消费者2:领取任务并完成任务,假设完成速度快
案例一、
消费者1
消费者2
与消费者1基本类似,就是没有设置消费耗时时间。
这里是模拟有些消费者快,有些比较慢。
接下来,两个消费者一同启动,然后发送50条消息:
可以发现,两个消费者各自消费了25条消息,而且各不相同,这就实现了任务的分发。
案例二、能者多劳
在两个消费放当中都添加
// 设置每个消费者同时只能处理一条消息
channel.basicQos(1);
刚才的实现有问题吗?
- 消费者1比消费者2的效率要低,一次任务的耗时较长
- 然而两人最终消费的消息数量是一样的
- 消费者2大量时间处于空闲状态,消费者1一直忙碌
现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
怎么实现呢?
我们可以修改设置,让消费者同一时间只接收一条消息,这样处理完成之前,就不会接收更多消息,就可以让处理快的人,接收更多消息 :
生产者
生产者与案例1中的几乎一样:
package cn.itcast.rabbitmq.work;
import cn.itcast.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
// 生产者
public class Send {
private final static String QUEUE_NAME = "test_work_queue";
public static void main(String[] argv) throws Exception {
// 获取到连接
Connection connection = ConnectionUtil.getConnection();
// 获取通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 循环发布任务
for (int i = 0; i < 50; i++) {
// 消息内容
String message = "task .. " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 2);
}
// 关闭通道和连接
channel.close();
connection.close();
}
}
不过这里我们是循环发送50条消息。
消费者1
package cn.itcast.rabbitmq.work;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import 以上是关于安装RabbitMQ(Centos6)(入门使用教程)(消息丢失的解决方案)以及Spring AMQP的使用的主要内容,如果未能解决你的问题,请参考以下文章
Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)十七(安装RabbitMQ(Centos6)(入门使用教程))以及Spring AMQP的)
Java网络商城项目 SpringBoot+SpringCloud+Vue 网络商城(SSM前后端分离项目)十七(安装RabbitMQ(Centos6)(入门使用教程))以及Spring AMQP的)
Linux从入门到放弃零基础入门Linux(第三篇):在虚拟机vmware中安装linux超详细手把手教你安装centos6分步图解