初闻 RabbitMQ
Posted double_lifly
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了初闻 RabbitMQ相关的知识,希望对你有一定的参考价值。
RabbitMQ
MQ全称为Message Queue,即消息队列,RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通讯方式,消息队列在分布式系统应用非常广泛。
应用场景
- 任务异步处理
将不需要同步处理的并且消耗时长的操作由消息队列接收进行异步处理,可以提高程序的响应时间- 应用程序解耦合
MQ相当于一个中介,生产方式通过MQ与消费方交互,它将应用程序进行解耦合
常用的消息队列
ActiveMQ RabbitMQ ZeroMQ Kafka MetaMQ RocketMQ Redis
为什么使用RabbitMQ呢
- 使用简单,功能强大
- 基于AMQP协议
- 社区活跃,文档完善
- 高并发性能好,基于erlang语言
- SpringBoot默认集成RabbitMQ
AMQP介绍
AMQP即Advanced Message Queuing Protocol一个统一消息服务的应用层标准高级消息队列协议,即应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
JMS是什么?
是java消息服务(Java Message Service)应用程序接口是一个java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与平台无关的API,绝大多数MOM提供商都对JMS提供支持。
总结
JMS是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似于java的jdbc,只是遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么不同,jms是java语言专属的消息服务标准,它是在api层定义标准,并且智能用于java应用,而AMQP是在协议层定义的标准,是跨语言的。
RabbitMQ的工作原理
Broker:消息队列服务进程,该进程包括两部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某和队列,对消息进行过滤
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息
消息发布接收流程
发送消息
- 生产者和Broker建立TCP连接
- 生产者和Broker建立通道
- 生产者通过通道消息发送给Broker,由Exchange将消息进行转发
- Exchange将消息转发到指定的Queue
接收消息- 消费者和Broker建立TCP连接
- 消费者和Broker建立通道
- 消费者监听指定的Queue
- 当有消息到达Queue时Broker默认将消息推送给消费者
- 消费者接收到消息
下载与安装
本来想把自己的opt与rabbitMQ上传上来的,该文件已存在,安装的话就百度下吧,安装好后,打开服务,界面如下
简单入门案例编写
- 创建maven工程
- 导入依赖
- 编写生产者代码
- 编写消费者代码
- 结果截图
maven工程
我是用springboot创建的maven工程,其parent使用的是2.0.0,里面使用了lambok插件
依赖如下
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.8.13</version>
<scope>test</scope>
</dependency>
</dependencies>
创建生产者
public class Producer01 {
//队列名称
private static final String QUEUE = "helloworld";
@SneakyThrows
public static void main(String[] args) {
//创建连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
//链接信息
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务 器
factory.setVirtualHost("/");
//创建于rabbitMQ服务得tcp连接
Connection connection = null;
Channel channel = null;
try {
connection= factory.newConnection();
//创建与exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
channel = connection.createChannel();
/**
* 声明队列,如果rabbit中没有次队列将自动创建
* param1:队列名称
* param2:是否持久化,若持久化,mq重启后该队列仍然存在
* param3:队列是否独占此连接,队列只允许在该连接中访问,如果连接时关闭,则队列自动删除
* param4:队列不再使用时是够自动删除次队列
* param5:扩展参数
*/
channel.queueDeclare(QUEUE,true,false,false,null);
String message = "hello world"+System.currentTimeMillis();
/**
* 消息发布方法
* param1:exchange的名称,如果没有指定,则使用default exchange
* param2:routinKey,消息的路由key,是用于Exchange(交换机)将消息转发到指定的消息
* param3:消息包含的属性
* param4:消息体
*/
channel.basicPublish("",QUEUE,null,message.getBytes());
System.out.println("Send Message is:"+message+"'");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
if (channel!= null){
channel.close();
}
if (connection != null){
connection.close();
}
}
}
}
创建消费者
public class Consumer01 {
private static final String QUEUE = "helloworld";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
//设置rabbitMQ所在服务器的ip和端口
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE,true,false,false,null);
//定义消费方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 消费者接收消息调用此方法
* @param consumerTag 消费者的标签,在channel.baseicConsumer()去指定
* @param envelope 消息包的内容,可从中获取消息id,消息routingkey,叫花鸡,消息和重传标志
* (收到消息失败后是否需要冲洗发送)
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//消息id
long deliveryTag = envelope.getDeliveryTag();
//消息内容
String msg = new String(body, "utf-8");
System.out.println("receive message..."+msg);
}
};
/**
* 监听队列String queue,boolean autoAck,Consumer callback
* String queue:队列名称
* boolean autoAck:设置为true表示消息接收自动向mq回复收到了,mq接收到回复会删除消息,设置为false则需要手动回复
* Consumer callback:消费消息的方法,消费者接收到消息后调用此方法
*/
channel.basicConsume(QUEUE,true,consumer);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
结果截图
以上是关于初闻 RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段