RabbitMQ交换机RabbitMQ整合springCloud

Posted Me*源

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ交换机RabbitMQ整合springCloud相关的知识,希望对你有一定的参考价值。

目标

1、交换机

2、RabbitMQ整合springCloud

 

交换机

蓝色区域===生产者

红色区域===Server:又称Broker,接受客户端的连接,实现AMQP实体服务

 

 

绿色区域===消费者

黄色区域===就是我们的交换机以及队列

 

由生产者投递信息到RabbitMQ Server里面某一个交换机对应的队列中,消费者则是从对应的队列中获取信息

 

 

交换机属性:

Name:交换机名称

Type:交换机类型 directtopicfanoutheaders

Durability:是否需要持久化,true为持久化

Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该Exchange

Internal:当前Exchange是否用于RabbitMQ内部使用,默认为False

Arguments:扩展参数,用于扩展AMQP协议,定制化使用

 

直流交换机

 

直连交换机Direct Exchange(完全匹配路由key

 

所有发送到Direct Exchange的消息会被转发到RouteKey中指定的Queue

 

注意:Direct模式可以使用RabbitMQ自带的Exchangedefault Exchange

所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃;

 

 

消费端代码

 

 1 package com.yuan.rabbitmqapi.exchange.direct;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 import com.rabbitmq.client.QueueingConsumer;
 7 
 8 
 9 public class Consumer4DirectExchange {
10     public static void main(String[] args) throws Exception {
11 
12 
13         ConnectionFactory connectionFactory = new ConnectionFactory() ;
14 
15         connectionFactory.setHost("192.168.238.129");
16         connectionFactory.setPort(5672);
17         connectionFactory.setVirtualHost("/");
18 
19         connectionFactory.setAutomaticRecoveryEnabled(true);
20         connectionFactory.setNetworkRecoveryInterval(3000);
21         Connection connection = connectionFactory.newConnection();
22 
23         Channel channel = connection.createChannel();
24         //4 声明
25         //交换机名称
26         String exchangeName = "test_direct_exchange";
27         //交换机类型
28         String exchangeType = "direct";
29         //队列名
30         String queueName = "test_direct_queue";
31         //访问规则
32         String routingKey = "test.direct";
33 
34         //表示声明了一个交换机
35         channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
36         //表示声明了一个队列
37         channel.queueDeclare(queueName, false, false, false, null);
38         //建立一个绑定关系:
39         channel.queueBind(queueName, exchangeName, routingKey);
40 
41         //durable 是否持久化消息
42         QueueingConsumer consumer = new QueueingConsumer(channel);
43         //参数:队列名称、是否自动ACK、Consumer
44         channel.basicConsume(queueName, true, consumer);
45         //循环获取消息
46         while(true){
47             //获取消息,如果没有消息,这一步将会一直阻塞
48             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
49             String msg = new String(delivery.getBody());
50             System.out.println("收到消息:" + msg);
51         }
52     }
53 }

 

生产端代码

 

 1 package com.yuan.rabbitmqapi.exchange.direct;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 
 8 public class Producer4DirectExchange {
 9     public static void main(String[] args) throws Exception {
10 
11         //1 创建ConnectionFactory
12         ConnectionFactory connectionFactory = new ConnectionFactory();
13         connectionFactory.setHost("192.168.238.129");
14         connectionFactory.setPort(5672);
15         connectionFactory.setVirtualHost("/");
16 
17         //2 创建Connection
18         Connection connection = connectionFactory.newConnection();
19         //3 创建Channel
20         Channel channel = connection.createChannel();
21         //4 声明
22         String exchangeName = "test_direct_exchange";
23         String routingKey = "test.direct";
24 //        String routingKey = "test.direct111"; //收不到
25         //5 发送
26 
27         String msg = "Hello World RabbitMQ 4  Direct Exchange Message 111 ... ";
28         channel.basicPublish(exchangeName, routingKey , null , msg.getBytes());
29 
30     }
31 }

 

启动消费端

 

创建队列

 

 

交换机

 

 

进入交换机,里面也绑定了对应的队列

 

 

 

完了之后停掉消费端,先启动生产端  将信息投递到队列中,如果生产端和消费端的队列名不一致,消费端则拿不到信息

 

 

 

 

主题交换机

 

主题交换机Topic Exchange(匹配路由规则的交换机)

 

所有发送到Topic Exchange的消息被转发到所有关系RouteKey中指定TopicQueue上;

 

Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic

 

 

 

注意:可以使用通配符进行模糊匹配

 

符号:“#” 匹配一个或者多个词

 

符号:“*” 匹配不多不少一个词

 

列如:

 

log.#” 能够匹配到 “log.info.oa

 

log.*” 能够匹配到 “log.err

 

 

消费端代码

 

 1 package com.yuan.rabbitmqapi.exchange.topic;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 import com.rabbitmq.client.QueueingConsumer;
 7 
 8 
 9 public class Consumer4TopicExchange {
10     public static void main(String[] args) throws Exception {
11 
12 
13         ConnectionFactory connectionFactory = new ConnectionFactory() ;
14 
15         connectionFactory.setHost("192.168.238.129");
16         connectionFactory.setPort(5672);
17         connectionFactory.setVirtualHost("/");
18 
19         connectionFactory.setAutomaticRecoveryEnabled(true);
20         connectionFactory.setNetworkRecoveryInterval(3000);
21         Connection connection = connectionFactory.newConnection();
22 
23         Channel channel = connection.createChannel();
24         //4 声明
25         String exchangeName = "test_topic_exchange";
26         String exchangeType = "topic";
27         String queueName = "test_topic_queue";
28         String routingKey = "user.#";
29 //        String routingKey = "user.*";
30         // 1 声明交换机
31         channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
32         // 2 声明队列
33         channel.queueDeclare(queueName, false, false, false, null);
34         // 3 建立交换机和队列的绑定关系:
35         channel.queueBind(queueName, exchangeName, routingKey);
36 
37         //durable 是否持久化消息
38         QueueingConsumer consumer = new QueueingConsumer(channel);
39         //参数:队列名称、是否自动ACK、Consumer
40         channel.basicConsume(queueName, true, consumer);
41         //循环获取消息
42         while(true){
43             //获取消息,如果没有消息,这一步将会一直阻塞
44             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
45             String msg = new String(delivery.getBody());
46             System.out.println("收到消息:" + msg);
47         }
48     }
49 }

 

生产端代码

 

 1 package com.yuan.rabbitmqapi.exchange.topic;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 
 8 public class Producer4TopicExchange {
 9     public static void main(String[] args) throws Exception {
10 
11         //1 创建ConnectionFactory
12         ConnectionFactory connectionFactory = new ConnectionFactory();
13         connectionFactory.setHost("192.168.238.129");
14         connectionFactory.setPort(5672);
15         connectionFactory.setVirtualHost("/");
16 
17         //2 创建Connection
18         Connection connection = connectionFactory.newConnection();
19         //3 创建Channel
20         Channel channel = connection.createChannel();
21         //4 声明
22         String exchangeName = "test_topic_exchange";
23         String routingKey1 = "user.save";
24         String routingKey2 = "user.update";
25         String routingKey3 = "user.delete.abc";
26         //5 发送
27 
28         String msg = "Hello World RabbitMQ 4 Topic Exchange Message ...";
29         channel.basicPublish(exchangeName, routingKey1 , null , msg.getBytes());
30         channel.basicPublish(exchangeName, routingKey2 , null , msg.getBytes());
31         channel.basicPublish(exchangeName, routingKey3 , null , msg.getBytes());
32         channel.close();
33         connection.close();
34     }
35 }

 

启动消费端,查看队列及交换机

 

 

 这里我们还可以点击交换机进去看它的一个绑定规则

 测试“log.#” 能够匹配到 “log.info.oa”

消费端代码

 

先启动生产端,再启动消费端

 

log.*” 能够匹配到 “log.err”

 

 

 

 它的绑定规则改变了

后台只会收到了两条信息

 

 

 

输出交换机

 

输出交换机Fanout Exchange(不做路由)

 

不处理路由键,只需要简单的将队列绑定到交换机上;

发送到交换机的消息都会被转发到与该交换机绑定的所有队列上;

Fanout交换机转发消息是最快的

 

 

 消费端代码

 1 package com.yuan.rabbitmqapi.exchange.fanout;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 import com.rabbitmq.client.QueueingConsumer;
 7 
 8 
 9 public class Consumer4FanoutExchange {
10     public static void main(String[] args) throws Exception {
11 
12         ConnectionFactory connectionFactory = new ConnectionFactory() ;
13 
14         connectionFactory.setHost("192.168.238.129");
15         connectionFactory.setPort(5672);
16         connectionFactory.setVirtualHost("/");
17 
18         connectionFactory.setAutomaticRecoveryEnabled(true);
19         connectionFactory.setNetworkRecoveryInterval(3000);
20         Connection connection = connectionFactory.newConnection();
21 
22         Channel channel = connection.createChannel();
23         //4 声明
24         String exchangeName = "test_fanout_exchange";
25         String exchangeType = "fanout";
26         String queueName = "test_fanout_queue";
27         String routingKey = "";    //不设置路由键
28         channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
29         channel.queueDeclare(queueName, false, false, false, null);
30         channel.queueBind(queueName, exchangeName, routingKey);
31 
32         //durable 是否持久化消息
33         QueueingConsumer consumer = new QueueingConsumer(channel);
34         //参数:队列名称、是否自动ACK、Consumer
35         channel.basicConsume(queueName, true, consumer);
36         //循环获取消息
37         while(true){
38             //获取消息,如果没有消息,这一步将会一直阻塞
39             QueueingConsumer.Delivery delivery = consumer.nextDelivery();
40             String msg = new String(delivery.getBody());
41             System.out.println("收到消息:" + msg);
42         }
43     }
44 }

 

生产端

 1 package com.yuan.rabbitmqapi.exchange.fanout;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 
 8 public class Producer4FanoutExchange {
 9     public static void main(String[] args) throws Exception {
10 
11         //1 创建ConnectionFactory
12         ConnectionFactory connectionFactory = new ConnectionFactory();
13         connectionFactory.setHost("192.168.238.129");
14         connectionFactory.setPort(5672);
15         connectionFactory.setVirtualHost("/");
16 
17         //2 创建Connection
18         Connection connection = connectionFactory.newConnection();
19         //3 创建Channel
20         Channel channel = connection.createChannel();
21         //4 声明
22         String exchangeName = "test_fanout_exchange";
23         //5 发送
24         for(int i = 0; i < 10; i ++) {
25             String msg = "Hello World RabbitMQ 4 FANOUT Exchange Message ...";
26             channel.basicPublish(exchangeName, "", null , msg.getBytes());
27         }
28         channel.close();
29         connection.close();
30     }
31 }

 

 

 

 

 

 

 

 

 

Binding-绑定

Exchange和Exchange、Queue之间的连接关系;

Binding中可以包含RoutingKey或者参数

 

Queue-消息队列

消息队列,实际存储消息数据

Durability:是否持久化

Durable:是,Transient:否

Auto delete:如选yes,代表当最后一个监听被移除之后,该Queue会自动被删除

 

 

Message-消息

服务器和应用程序之间传递的数据

本质上就是一段数据,由Properties和Payload(Body)组成

常用属性:delivery model、headers(自定义属性)

 

 

Message-其他属性

content_type、content_encoding、priority

correlation_id、reply_to、expiration、message_id

Timestamp、type、user_id、app_id、cluster_id

 

Virtual host-虚拟主机

虚拟地址,用于进行逻辑隔离,最上层的消息路由

一个Virtual Host里面可以有若干个Exchange和Queue

同一个Virtual Host里面不能有相同名称的Exchange或Queue

 

小结:

RabbitMQ的概念、安装与使用、管控台操作;

结合RabbitMQ的特性、Exchange、Queue、Binding、RoutingKey、Message进行核心API的讲解

 

 

RabbitMQ整合 SpringCloud实战

注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效

生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等

 

消费端核心配置

u 首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理

u 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况

 

@RabbitListener注解的使用

消费端监听@RabbitListener注解,这个对于在实际工作中非常的好用

u @RabbitListener是一个组合注解,里面可以注解配置(@QueueBinding@Queue@Exchange)直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等

相关代码

rabbitmq-common子项目

 1 package com.yuan.rabbitmqcommon.entity;
 2 
 3 import java.io.Serializable;
 4 
 5 
 6 public class Order implements Serializable {
 7 
 8     private String id;
 9     private String name;
10 
11     public Order() {
12     }
13     public Order(String id, String name) {
14         super();
15         this.id = id;
16         this.name = name;
17     }
18     public String getId() {
19         return id;
20     }
21     public void setId(String id) {
22         this.id = id;
23     }
24     public String getName() {
25         return name;
26     }
27     public void setName(String name) {
28         this.name = name;
29     }
30 
31 
32 }

 

rabbitmq-springcloud-consumer子项目

Pom依赖

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 4     <modelVersion>4.0.0</modelVersion>
 5     <parent>
 6         <groupId>org.springframework.boot</groupId>
 7         <artifactId>spring-boot-starter-parent</artifactId>
 8         <version>2.2.2.RELEASE</version>
 9         <relativePath/> <!-- lookup parent from repository -->
10     </parent>
11     <groupId>com.yuan</groupId>
12     <artifactId>rabbitmq-springcloud-consumer</artifactId>
13     <version>0.0.1-SNAPSHOT</version>
14     <name>rabbitmq-springcloud-consumer</name>
15     <description>Demo project for Spring Boot</description>
16 
17     <properties>
18         <java.version>1.8</java.version>
19     </properties>
20 
21     <dependencies>
22         <dependency>
23             <groupId>org.springframework.boot</groupId>
24             <artifactId>spring-boot-starter</artifactId>
25         </dependency>
26 
27         <dependency>
28             <groupId>com.yuan</groupId>
29             <artifactId>rabbitmq-common</artifactId>
30             <version>0.0.1-SNAPSHOT</version>
31         </dependency>
32 
33         <dependency>
34             <groupId>org.springframework.boot</groupId>
35             <artifactId>spring-boot-starter-test</artifactId>
36             <scope>test</scope>
37             <exclusions>
38                 <exclusion>
39                     <groupId>org.junit.vintage</groupId>
40                     <artifactId>junit-vintage-engine</artifactId>
41                 </exclusion>
42             </exclusions>
43         </dependency>
44 
45         <dependency>
46             <groupId>org.springframework.boot</groupId>
47             <artifactId>spring-boot-starter-amqp</artifactId>
48         </dependency>
49         <dependency>
50             <groupId>junit</groupId>
51             <artifactId>junit</artifactId>
52             <version>4.12</version>
53             <scope>test</scope>
54         </dependency>
55     </dependencies>
56 
57     <build>
58         <plugins>
59             <plugin>
60                 <groupId>org.springframework.boot</groupId>
61                 <artifactId>spring-boot-maven-plugin</artifactId>
62             </plugin>
63         </plugins>
64     </build>
65 
66 </project>

 

Yml配置

 1 spring.rabbitmq.addresses=192.168.238.129:5672
 2 spring.rabbitmq.username=guest
 3 spring.rabbitmq.password=guest
 4 spring.rabbitmq.virtual-host=/
 5 spring.rabbitmq.connection-timeout=15000
 6 
 7 server.port=80
 8 server.servlet.context-path=/
 9 
以上是关于RabbitMQ交换机RabbitMQ整合springCloud的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ之交换机及spring整合

RabbitMQ基础组件和SpringBoot整合RabbitMQ简单示例

Spring Boot整合RabbitMQ

[Spring boot] Spring boot 整合RabbitMQ实现通过RabbitMQ进行项目的连接

SpringBoot学习—— springboot快速整合RabbitMQ

SpringBoot整合RabbitMQ