消息队列(RabbitMQ)
Posted xue_yun_xiang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列(RabbitMQ)相关的知识,希望对你有一定的参考价值。
一、概述
消息队列作用:解耦、异步、削峰
解耦
解耦,就是将连个应用不是互相之间强依赖关系,一个挂了,两外一个依然可以使用
削峰
削锋,就是可以将高并发的网络请求 缓存到消息队中,慢慢的执行消息对对应的业务
618 秒杀购买商品
生成订单
扣减库存
扣减余额
发货,生成物流
如果以上 步骤都在一个应用的请求中去执行,在每秒 50000个并发的情况下,是否回对应服务造成很大的压力
优化流程
1.先扣减库存
2将对应商品生成订单的消息,发送到消息队列
3.让应用慢的消费消息 完成一下业务
异步
用户向服务发起请求,我们不会直接告诉用户结果,而是异步处理业务逻辑,稍后通过 通知的形式告诉用户
二、消息队列框架
常用消息队列框架:ActiveMQ,RocketMQ,Kafka,RabbitMQ。
RabbitMQ :特点 简单易用,支持多种语言,保证消息不丢失 可以配合作为分布式事务
Kafka: 特点是吞吐量非常大,适合大数据项目使用 支持多语言
RocketMQ:阿里出品,效率也很好,主要在spingcloud alibaba套件中使用,可以配合作为分布式事务
ActiveMQ:只支持java语言 很少用
三、RabbitMq
介绍
publisher (生产者) :负责生产消息,并且将消息 放到交换机
Exchange - 交换机:交互机 主要用来接收生产者 发送的消息 并且将消息发送给 路由
Routes - 路由:将接收到的消息 根据路由规则 ,发送到不同的消息队列(真正存储数据的地方)
Queue - 队列:就是存放消息的位置(存储再磁盘中)
Consumer - 消费者:负责 将队列中的消息读取出来
安装
使用docker 安装
1、创建
[root@mastera java2102]# mkdir docker-compose-rabbitmq
[root@mastera java2102]# cd docker-compose-rabbitmq/
[root@mastera docker-compose-rabbitmq]#
[root@mastera docker-compose-rabbitmq]# vim docker-compose.yaml
version: “3.1”
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:management
restart: always
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
volumes:
- ./data:/var/lib/rabbitmq
2、启动
[root@mastera docker-compose-rabbitmq]# docker-compose up
3、访问
在edge或者 火狐 不要再Google 访问
15672 后台管理界面 端口 http://192.168.12.130:15672/#/
5672 是客户端通信端口
四、RabbitMQ的通讯方式
1、最简单模式:一个生产者、一个消费者、一个队列
2、多个消费者共享队列模式:一个生产者 、一个队列、多个消费者共享 一个队列
3、发布订阅模式:一个生产者 多个队列,多个消费者
保证一个消息 可以被多个消费者,同时消费
4、路由模式:一个生产者、一个交换机、多个队列、多个消费者
5、topic 模式
五、代码实现以上五种模式
1、创建一个java 工程 并引入依赖
<dependencies>
<!--
rabbitmq 相关依赖
-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<!-- java 单元测试-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
2、创建工具类
/**
* 创建rabbitmq 客户端
*/
public class RabbitMQClientFactory
/**
* 创建 rabbit 连接
* @return
*/
public static Connection getConnection()
ConnectionFactory connectionFactory = new ConnectionFactory();
// 5672 客户端连接端口
connectionFactory.setPort(5672);
connectionFactory.setHost("192.168.12.130");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// VirtualHost 是一个路径 相当于mysql中的数据库 文件系统中的文件夹 决定着消息的存储位置
connectionFactory.setVirtualHost("/");
Connection connection = null;
try
connection = connectionFactory.newConnection();
catch (IOException e)
e.printStackTrace();
catch (TimeoutException e)
e.printStackTrace();
return connection;
3、简单模式测试
import com.rabbitmq.client.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RabbitMQTest1
private Connection connection;
@Before // 初始化获取连接
public void init()
//
connection = RabbitMQClientFactory.getConnection();
// 在rabbitmq 中都要 先启动消费者 在启动生产者 否则有可能报错
/**
* 消费者 消费消息
* @throws IOException
* @throws TimeoutException
*/
@Test
public void customer() throws IOException, TimeoutException
// 创建一个channel
Channel channel = connection.createChannel();
// 声明当前channel 绑定哪一个队列
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外
// (conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("hello-queue",true,false,false,null);
// 当前消费者 每次消费一条数据
channel.basicQos(1);
// 创建消费者 在此接受消息
Consumer consumer = new DefaultConsumer(channel)
@Override // 在当前方法 方法总接受消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
//String consumerTag 消费者标记
// Envelope envelope 信封
// byte[] body 消息中的内容
System.out.println("消费者接受到的消息:"+new String(body,"utf-8"));
;
// 向前channel 绑定一个消费者
//参数1:queue - 指定消费哪个队列 队列名称
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
// 消费者 接收到 消息之后 会自动告诉 队列,说我已经接受到消息,否则队列还会再次发送刚才发送的消息
// 确保消费者 一定确认接受到消息
//参数3:consumer - 指定消费回调
channel.basicConsume("hello-queue", true,consumer);
// 等待命令行输入 作用就是让当前程序卡住
System.in.read();
// 释放资源
channel.close();
connection.close();
/**
* 生产者 发送消息
* @throws IOException
* @throws TimeoutException
*/
@Test
public void publisherTest() throws IOException, TimeoutException
// 创建一个channel
Channel channel = connection.createChannel();
// 发送消息
// 参数1:指定exchange,使用""。 使用默认交互机
// 参数2:指定路由的规则,使用具体的队列名称。
// 取值两种 1.队列名称 2.路由规则
// 参数3:指定传递的消息所携带的properties,使用null。
// 参数4:指定发布的具体消息,byte[]类型
channel.basicPublish("","hello-queue" , null,"hello world".getBytes());
System.out.println("生产者发送消息");
channel.close();
connection.close();
注意
1、消费者消费时,必须有ack 机制
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
// 消费者 接收到 消息之后 会自动告诉 队列,说我已经接受到消息,否则队列还会再次发送刚才发送的消息
// 确保消费者 一定确认接受到消息
2、启动顺序的问题
// 在rabbitmq 中都要 先启动消费者 在启动生产者 否则有可能报错
测试
4、多个消费者共享队列模式(Work)测试
只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing
消费者指定Qoa和手动ack
两个消费者消费同一个消息队列
@Test
public void customerTest1() throws IOException
//1. 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//2. 创建channel
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("HelloWorld",true,false,false,null);
// 每次推送只接受1 条消息
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
try
Thread.sleep(10000);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("message:"+new String(body,"utf-8"));
// 手动ack
// false 是否多条批次
channel.basicAck(envelope.getDeliveryTag(),false);
;
// 手动ack
channel.basicConsume("HelloWorld",false,defaultConsumer);
System.out.println("开始消费");
System.in.read();
@Test
public void customerTest2() throws IOException
//1. 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//2. 创建channel
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("HelloWorld",true,false,false,null);
// 每次推送只接受1 条消息
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
try
Thread.sleep(10000);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("customerTest2-message:"+new String(body,"utf-8"));
// 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
;
// 手动ack
channel.basicConsume("HelloWorld",false,defaultConsumer);
System.out.println("开始消费");
System.in.read();
5、发布订阅模式测试
声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。
让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。
public class PublishSubjectTest
@Test
public void PublishTest() throws IOException, TimeoutException
// 获取connection
Connection connection = RabbitMQUtils.getConnection();
// 创建channel
Channel channel = connection.createChannel();
//3. 创建exchange - 绑定某一个队列
//参数1: exchange的名称
//参数2: 指定exchange的类型 FANOUT - pubsub , DIRECT - Routing , TOPIC - Topics
// 分散输出
channel.exchangeDeclare("pubsub-exchange", BuiltinExchangeType.FANOUT);
channel.queueBind("pubsub-queue1","pubsub-exchange","");
channel.queueBind("pubsub-queue2","pubsub-exchange","");
for (int i=0;i<10;i++)
String message = "hello" + i;
channel.basicPublish("pubsub-exchange","",null,message.getBytes());
System.out.println("发布成功");
channel.close();
connection.close();
@Test
public void subjectTest1() throws IOException
//1. 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//2. 创建channel
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("pubsub-queue1",true,false,false,null);
// 每次推送只接受1 条消息
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
try
Thread.sleep(10000);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("消费者1-message:"+new String(body,"utf-8"));
// 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
;
// 手动ack
channel.basicConsume("pubsub-queue1",false,defaultConsumer);
System.out.println("开始消费");
System.in.read();
@Test
public void subjectTest2() throws IOException
//1. 获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//2. 创建channel
final Channel channel = connection.createChannel();
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("pubsub-queue2",true,false,false,null);
// 每次推送只接受1 条消息
channel.basicQos(1);
DefaultConsumer defaultConsumer = new DefaultConsumer(channel)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
try
Thread.sleep(10000);
catch (InterruptedException e)
e.printStackTrace();
System.out.println("消费者2-message:"+new String(body,"utf-8"));
// 手动ack
channel.basicAck(envelope.getDeliveryTag(),false);
;
// 手动ack
channel.basicConsume("pubsub-queue2",false,defaultConsumer);
System.out.println("开始消费");
System.in.read();
消费者还是正常的监听某一个队列即可。
6、路由模式测试
生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。
//3. 创建exchange, routing-queue-error,routing-queue-info,
channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
channel.queueBind("routing-queue-error","routing-exchange","ERROR");
channel.queueBind("routing-queue-info","routing-exchange","INFO");
//4. 发布消息到exchange,同时指定路由的规则
channel.basicPublish("routing-exchange","ERROR",null,"ERROR".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO1".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO2".getBytes());
channel.basicPublish("routing-exchange","INFO",null,"INFO3".getBytes());
消费者没有变化
package com.wgz;
import com.rabbitmq.client.*;
import com.wgz.utils.RabbitMQUtils;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RoutingTest
@Test //先启动消费者,在启动生产者
public void PublishTest() throws IOException, TimeoutException
// 获取connection
Connection connection = RabbitMQUtils.getConnection();
// 创建channel
Channel channel = connection.createChannel(消息队列(RabbitMQ)