SpringBoot整合RabbitMQ
Posted 勤奋的木比白
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot整合RabbitMQ相关的知识,希望对你有一定的参考价值。
MQ
- 消息可靠性
- 消息幂等性
- MQ的高可用
基本概念
MQ,Message Queue消息队列,是消息传输过程中保存消息的容器,多用于分布式系统之间进行通信。
- MQ,消息队列,存储消息的中间件
- 分布式系统通信方式:直接远程调用 、借助第三方完成间接通信
- 发送方为生产者、接收方为消费者
优劣
优势
- 应用解耦:提高系统容错性与可维护性
- 异步提速:提高用户体验与系统吞吐量
- 消费填谷:提高系统稳定性
劣势
- 系统复杂度提高
- 引入rabbitmq产生的新问题,如:网络通信、数据一致性、幂等性等
使用条件
- 生产者无需从消费者处得到反馈
- 允许数据短暂的不一致性
- 引入MQ效益明显高于不引入时的效益
常见产品
- RabbitMQ:延迟最低
- AcitveMQ:老牌MQ,性能最低
- RocketMQ:高吞吐量,高并发、分布式MQ、金融
- Kafka:高吞吐量,大数据方面
AMQP协议
RabbitMQ
-
Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker
-
Virtual host:处于多租户和安全因素设计,把AMQP的基本组件分到一个虚拟的分组中,类似与网络中的namespace概念(VCP,专有网络),当多个不用用户使用同一个RabbitMQ Server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange、queue等
-
Connection:publisher/consumer和broker之间的TCP连接,每个Connection中有多个channel(类似连接池,避免重复创建channel损耗性能)
-
Channel:Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,QMQP method包含了channel id帮助客户端和message broker识别channel,所有channel之间是完全隔离的。Channel作为轻量级Connection极大减少了操作系统建立TCP connection的开销
-
Exchange:交换机,message到达broker的第一站,根据分发规则,匹配表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point),topic(publish-subscribe),fanout(multicast)
-
Queue:消息最终被送到这里等待consumer取走
-
Binding:exchange和queue之间的虚拟连接,binding中可以用包含routing key。Binding信息被保存到exchange的查询表中,用于message的分发依据
JMS
- JMS,Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API.
- JMS是JavaEE规范中的一种,类比JDBC
- 很多消息中间件都实现了JMS规范,例如:ActiveMQ. RabbitMQ官方没有提供JMS的实现包,但是开源社区有提供.
工作模式(Java客户端版)
简单模式
生产者
<dependencies>
<!--rabbitmq java客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
</dependencies>
/**
* 发送消息
*/
public class MessageProducer
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
factory.setHost("43.139.51.247"); // ip 默认值:localhost
factory.setPort(5672); // 端口 默认值:5672
factory.setVirtualHost("/zhd"); // 虚拟机 默认值:/
factory.setUsername("czk");
factory.setPassword("czk");
// 3.创建连接Connection
Connection connection = factory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建队列Queue
/**
* public DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* queue:队列名称,队列名称存在则使用,不存在则创建一个该名称队列
* durable:是否持久化
* exclusive:是否独占,只能有一个消费者监听队列,当Connection关闭时是否删除队列
* autoDelete:是否自动删除,没有Consumer时自动删除
* arguments:参数
*/
channel.queueDeclare("hello_world",true,false,false,null);
// 6.发送消息
/**
* public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* exchange:交换机名称。简单模式下交换机默认为 ""
* routingKey:路由名称。注意:使用默认交换机,则routingKey要与队列名一致才能正常路由
* props:配置信息
* body:消息数据
*/
String body = "hello rabbitmq~~~";
channel.basicPublish("","hello_world",null,body.getBytes());
// 7.释放资源
Thread.sleep(30000);
System.out.println("end...");
channel.close();
connection.close();
消费者
<dependencies>
<!--rabbitmq java客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
</dependencies>
/**
* 接收/消费消息
*/
public class MessageConsumer
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
factory.setHost("43.139.51.247"); // ip 默认值:localhost
factory.setPort(5672); // 端口 默认值:5672
factory.setVirtualHost("/zhd"); // 虚拟机 默认值:/
factory.setUsername("czk");
factory.setPassword("czk");
// 3.创建连接Connection
Connection connection = factory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建队列Queue
/**
* public DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* queue:队列名称,队列名称存在则使用,不存在则创建一个该名称队列
* durable:是否持久化
* exclusive:是否独占,只能有一个消费者监听队列,当Connection关闭时是否删除队列
* autoDelete:是否自动删除,没有Consumer时自动删除
* arguments:参数
*/
channel.queueDeclare("hello_world",true,false,false,null);
// 6.接收消息
/**
* public String basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:收到消息是否自动确认,消息丢失相关
* callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel)
/**
* 回调方法
* consumerTag:标识
* envelope:获取一些消息,交换机、路由key
* properties:配置消息
* body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("------------------------------------");
;
channel.basicConsume("hello_world",true,consumer);
// 监听程序,无需关闭资源
工作队列模式
生产者
<dependencies>
<!--rabbitmq java客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-scala-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
/**
* 发送消息
*/
public class MessageProducer_WorkQueue
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
factory.setHost("43.139.51.247"); // ip 默认值:localhost
factory.setPort(5672); // 端口 默认值:5672
factory.setVirtualHost("/zhd"); // 虚拟机 默认值:/
factory.setUsername("czk");
factory.setPassword("czk");
// 3.创建连接Connection
Connection connection = factory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建队列Queue
/**
* public DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* queue:队列名称,队列名称存在则使用,不存在则创建一个该名称队列
* durable:是否持久化
* exclusive:是否独占,只能有一个消费者监听队列,当Connection关闭时是否删除队列
* autoDelete:是否自动删除,没有Consumer时自动删除
* arguments:参数
*/
channel.queueDeclare("work_queue",true,false,false,null);
// 6.发送消息
/**
* public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* exchange:交换机名称。简单模式下交换机默认为 ""
* routingKey:路由名称。注意:使用默认交换机,则routingKey要与队列名一致才能正常路由
* props:配置信息
* body:消息数据
*/
for (int i = 0;i<10;i++)
String body = "hello rabbitmq : "+i;
channel.basicPublish("","work_queue",null,body.getBytes());
// 7.释放资源
Thread.sleep(30000);
System.out.println("end...");
channel.close();
connection.close();
消费者
<dependencies>
<!--rabbitmq java客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-scala-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
/**
* 接收/消费消息
*/
public class MessageConsumer_WorkQueue1
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
factory.setHost("43.139.51.247"); // ip 默认值:localhost
factory.setPort(5672); // 端口 默认值:5672
factory.setVirtualHost("/zhd"); // 虚拟机 默认值:/
factory.setUsername("czk");
factory.setPassword("czk");
// 3.创建连接Connection
Connection connection = factory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建队列Queue
/**
* public DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* queue:队列名称,队列名称存在则使用,不存在则创建一个该名称队列
* durable:是否持久化
* exclusive:是否独占,只能有一个消费者监听队列,当Connection关闭时是否删除队列
* autoDelete:是否自动删除,没有Consumer时自动删除
* arguments:参数
*/
channel.queueDeclare("work_queue",true,false,false,null);
// 6.接收消息
/**
* public String basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:收到消息是否自动确认,消息丢失相关
* callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel)
/**
* 回调方法
* consumerTag:标识
* envelope:获取一些消息,交换机、路由key
* properties:配置消息
* body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("------------------------------------");
;
channel.basicConsume("work_queue",true,consumer);
// 监听程序,无需关闭资源
/**
* 接收/消费消息
*/
public class MessageConsumer_WorkQueue2
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.设置参数
factory.setHost("43.139.51.247"); // ip 默认值:localhost
factory.setPort(5672); // 端口 默认值:5672
factory.setVirtualHost("/zhd"); // 虚拟机 默认值:/
factory.setUsername("czk");
factory.setPassword("czk");
// 3.创建连接Connection
Connection connection = factory.newConnection();
// 4.创建Channel
Channel channel = connection.createChannel();
// 5.创建队列Queue
/**
* public DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* queue:队列名称,队列名称存在则使用,不存在则创建一个该名称队列
* durable:是否持久化
* exclusive:是否独占,只能有一个消费者监听队列,当Connection关闭时是否删除队列
* autoDelete:是否自动删除,没有Consumer时自动删除
* arguments:参数
*/
channel.queueDeclare("work_queue",true,false,false,null);
// 6.接收消息
/**
* public String basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:收到消息是否自动确认,消息丢失相关
* callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel)
/**
* 回调方法
* consumerTag:标识
* envelope:获取一些消息,交换机、路由key
* properties:配置消息
* body:数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
System.out.println("------------------------------------");
;
channel.basicConsume("work_queue",true,consumer);
// 监听程序,无需关闭资源
Pub/Sub订阅模式
生产者
<dependencies>
<!--rabbitmq java客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
</dependencies>
/**
* 发送消息
*/
public class MessageProducer_PubSub
public static void main(以上是关于SpringBoot整合RabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章