RabbitMQ入门前篇
Posted 嘟嘟的程序员铲屎官
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ入门前篇相关的知识,希望对你有一定的参考价值。
本篇博文目录:
一.RabbitMQ
1.消息队列
Message Queue中文意思消息队列,是一种进程的通信机制,用于上下游传递消息。其中在二个进程之间MQ起到消息中间件的作用,实现在进程之间的解耦操作,原来是进程与进程之间直接通信,这样的话耦合性大,并且如果通信失败无法确定那一面出现问题,而通过在中间多一个信息传递就可以实现解耦,并且当有一端出现问题时,也能够知道是那一端出现了问题,保证了数据传输的可靠性。
2.RabbitMQ
RabbitMQ是众多消息代理服务器中使用的较广,较多的一款,RabbitMQ支持几乎所有的操作系统与编程语言,并且Rabbit提供高并发,高可用的成熟方案,支持多种消息协议,易于部署与使用。
下图列出了RabbitMQ与其他MQ的对比图(注意时效性,视频大概是2018出的)
RabbitMQ的应用场景如下:
3.安装RabbitMQ
- 安装教程
Winddos环境下安装教程: https://www.cnblogs.com/chenwolong/p/rabbitmq.html
Linux环境下安装教程:https://blog.csdn.net/qq_45173404/article/details/116429302
- Widnos的安装包下载(官网下载太慢)
erlang25.0.1版本:otp_win64_25.0.1.exe
https://www.aliyundrive.com/s/NJwrinH1VNy 提取码: m92c
rabbitmq3.11.7版本: rabbitmq-server-3.11.7.exe
https://www.aliyundrive.com/s/wkfkd7ewzNa 提取码: d43w
- 安装完毕,启动RabbitMQ管理模块的插件后,访问http://localhost:15672/ 输入账号guest,密码guest,进行登入,会进入如下界面,说明安装成功:
备注:如果无法访问,你可以看看这篇博文:https://itguye.blog.csdn.net/article/details/128770009
4.RabbitMQ常用命令
可以通过下面的命令来操作RabbitMQ,当然也可以在网站上通过图形化的方式进行操作。
rabbitmq-server 前台启动服务
rabbitmq-server -detached 后台启动服务
rabbitmqctl stop 停止服务
rabbitmqctl start_app 启动应用
rabbitmqctl stop_app 终止应用
rabbitmqctl add_user username password – 创建新用户
rabbitmqctl delete_user username – 删除用户
rabbitmqctl change_password username newpassword– 重置密码
rabbitmqctl set_user_tags username tag – 授予用户角色(Tag)
rabbitmqctl set_permissions -p / user_admin'.*' '.*''.*'– 设置用户允许访问的vhost
上面rabbitmqctl set_user_tags username tag命令中的tag,如下:
二.使用RabbitMQ进行编程
1.AMQP
AMQP是一个协议规范,二个不同应用程序只要遵循该协议就可以实现通信,其中RabbitMQ就是AMQP的一种实现方式。
AMQP中的一些知识概念,如生产者,消费者,消息,队列和虚拟主机等,解释如下:
2.第一次MQ通信
- 添加一个名为/test的虚拟主机
- 创建一个maven项目,并导入依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>
备注:最新版本为5.16版本: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client/5.16.0
- 创建utiles工具包,并创建RabbitmqUtils和RabbitConstant类,如下:
RabbitmqUtils类,Rabbit工具类,该工具类就一个方法,就是获取RabbitMq的连接对象Connection :
package utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitmqUtils
private static ConnectionFactory connectionFactory = new ConnectionFactory();
// 静态代码块进行初始化
static
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/test");
// 获取mq连接对象
public static Connection getConnection()
try
Connection connection = connectionFactory.newConnection();
return connection;
catch (Exception e)
throw new RuntimeException();
RabbitConstant类,该类存放RabbitMq的配置常量信息:
package utils;
public class RabbitConstant
public static final String QUEUE_HELLOWORLD = "helloworld";
public static final String QUEUE_SMS = "sms";
public static final String EXCHANGE_WEATHER = "weather";
public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";
public static final String QUEUE_BAIDU = "baidu";
public static final String QUEUE_SINA = "sina";
public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
- 创建一个helloworld的包,在该文件下创建生产者类和消费者类,来实现生产者发送一个helloworld的字符串,然后消费者接受该字符串的操作。
Producer类:首先通过Rabbitmq的工具类获取连接对象,然后通过连接对象创建通道(虚拟连接),接着通过虚拟连接创建一个名为helloworld的队列,并往队列中传递数据hellowrld:
package helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*/
public class Producer
public static void main(String[] args) throws IOException, TimeoutException
// 获取连接对象
Connection connection = RabbitmqUtils.getConnection();
// 创建通道,虚拟连接
Channel channel = connection.createChannel();
// 声明一个队列
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
String message = "hello world";
channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送数据成功");
// 关闭虚拟通道
channel.close();
// 关闭连接
connection.close();
ConSummer类:通过工具类获取连接,然后创建通道,通过通道使用helloworld队列,并创建一个消费者对象,传入匿名内部类DefaultConsumer,并传入channel对象,获取helloworld消息队列的数据:
package helloworld;
import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
/**
* 消费者
*/
public class ConSummer
public static void main(String[] args) throws IOException
// 获取mq连接
Connection connection = RabbitmqUtils.getConnection();
// 创建通道号
Channel channel = connection.createChannel();
// 获取队列
channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
// 接收
channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD,false, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("接收到的数据:"+new String(body));
//签收消息,确认消息
//envelope.getDeliveryTag() 获取这个消息的TagId
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(),false);
);
- 运行效果
- 消息的状态
三.RabbitMQ六中工作模式
1.RabbitMQ
RabbitMQ六种工作模式中,2~5使用较多,下面会给出实例代码,对于这几种方式存在一定的一致性,都是在方式1的基础上进行添加,功能更加丰富(上文中的案例代码就是Hello World方式)。
2.Work queues
本实例模拟短信通知服务,就是用户购买订单成功后,通过RabbitMQ发送短信给用户进行订单确认,实例代码,首先在helloworld项目中创建一个名为workqueue的包,并创建SMS,Producer,ConSummerOne,ConSummerTwo,ConSummerThree这五个类,详细代码如下:
- SMS类:实体类
package workqueue;
public class SMS
private String name;
private String mobile;
private String content;
public SMS(String name, String mobile, String content)
this.name = name;
this.mobile = mobile;
this.content = content;
public String getName()
return name;
public void setName(String name)
this.name = name;
public String getMobile()
return mobile;
public void setMobile(String mobile)
this.mobile = mobile;
public String getContent()
return content;
public void setContent(String content)
this.content = content;
- Producer:生产者,用来模拟100个用户同时订票
package workqueue;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*/
public class Producer
public static void main(String[] args) throws IOException, TimeoutException
// 获取连接对象
Connection connection = RabbitmqUtils.getConnection();
// 创建通道,虚拟连接
Channel channel = connection.createChannel();
// 声明一个队列
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
// 给100给订票的用户发送订票成功的短信信息
for (int i = 1; i <= 100; i++)
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
// 将sms转换为jSON对象
String message = new Gson().toJson(sms);
channel.basicPublish("", RabbitConstant.QUEUE_SMS, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送成功");
// 关闭虚拟通道
channel.close();
// 关闭连接
connection.close();
- ConSummerOne类:发送短信处理1
package workqueue;
import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
/**
* 消费者
*/
public class ConSummerOne
public static void main(String[] args) throws IOException
// 获取mq连接
Connection connection = RabbitmqUtils.getConnection();
// 创建通道号
Channel channel = connection.createChannel();
// 获取队列
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
channel.basicQos(1);// 为1时表示处理完一个取一个,不加就是一次取多个
// 接收
channel.basicConsume(RabbitConstant.QUEUE_SMS,false, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("消费者1,进行发送:"+new String(body));
try
Thread.sleep(100);// 延时0.1s
catch (InterruptedException e)
e.printStackTrace();
//签收消息,确认消息
//envelope.getDeliveryTag() 获取这个消息的TagId
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(),false);
);
- ConSummerTwo类:发送短信2
package workqueue;
import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;
import java.io.IOException;
/**
* 消费者
*/
public class ConSummerTwo
public static void main(String[] args) throws IOException
// 获取mq连接
Connection connection = RabbitmqUtils.getConnection();
// 创建通道号
Channel channel = connection.createChannel();
// 获取队列
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
channel.basicQos(1);// 为1时表示处理完一个取一个,不加就是一次取多个
// 接收
channel.basicConsume(RabbitConstant.QUEUE_SMS,false, new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("消费者2,进行发送:"+new String(body));
try
Thread.sleep(100);// 延时0.1s
catch (InterruptedException e)
e.printStackTrace();
//签收消息,确认消息
//envelope.getDeliveryTag() 获取这个消息的TagId
//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
channel.basicAck(envelope.getDeliveryTag(),falseRabbitMQ高级进阶(2021.05.29)
SpringBoot整合RabbitMQ(2021.05.29)