Spring AMQP杂记之Spring实现简述

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring AMQP杂记之Spring实现简述相关的知识,希望对你有一定的参考价值。

参考技术A 上一篇主要介绍了AMQP的一些知识,接下来开始正式步入Spring AMQP。

Message:在AMQP中并没有定义消息的模型,Spring为了方便我们理解与使用,新增了Message接口,在构建消息的时候Spring提供了builde API,MessageBuilder.xx.xx的形式使用起来很方便。

Exchange:这个接口和AMQP中定义的exchange基本相同,就不说了

Queue:同上。

Binding:一般叫他绑定关系,AMQP也有对其的抽象模型,只不过我认为他只不过相当于是附加在队列与交换机上的属性,所以在上篇关于AMQP的介绍中并没有详细说明。呃,其实spring对其的定义就是代表了队列与交换机的绑定关系。。。

spring提供了ConnectionFactory接口,当我们使用的时候会使用它的实现类CachingConnectionFactory,看名字也知道就是基于缓存的连接池,默认的池大小为25。Spring也提供了对于多个connectionFactory的支持接口例如SimpleRoutingConnectionFactory等。

我们使用SpringBoot进行测试,最小化的配置如下

这里先给出一个简单的例子然后再具体讲解。

如图,我们提前声明了一个名为hello的队列,浏览器访问/send时,可以看到控制台打印了相应的时间信息,即被@RabbitListener注解的方法被调用了。如果我们打开RabbitMq的webUI,会发现名为hello的队列中消息数量由0变为1再变为0。注意,这里我们并没有声明Exchange,MQ会为我们将队列绑定到默认的Exchange。

接下来就详细的说一下这个例子。对于操作RabbitMQ,Spring提供了 RabbitTemplate(对于batch操作,相应的是BatchingRabbitTemplate,在1.6版本以后,spring提供了异步的Template--AsyncRabbitTemplate)。我们使用它来发送与接收消息。当发送完消息的时候如何知道本次操作的成功或者失败呢?默认情况下不能被路由的消息将会被丢弃,这会导致消息丢失,不能保证消息可靠性(消息可靠性请参照上一篇AMQP介绍中的推荐)。发布确认机制是保证消息可靠性的第一步,发布确认保证我们知道消息是否成功到达队列中,返回ack则代表成功,nack则代表失败。要使用这个特性,我们需要将RabbitTemplate的mandatory属性和ConnectionFactory的publisherConfirms属性都设为true。这时我们可以在RabbitTemplate上设置setReturnCallback监听来接收MQ服务器返回的状态信息了。对于消息的确认,我们只需要设置RabbitTemplate.ConfirmCallback的回调方法即可。

当我们每次发送请求时,都会打印相应的ack,其中correlationData是生产者在发送数据时可以携带的相关信息。这里有个问题需要注意一下,RabbitTemplate只允许设置一个callback方法,这时你可以将RabbitTemplate的bean设为单例然后设置回调。

这样的缺点是所有使用这个template的地方都会使用这个回调,那么当我们想要为不同的操作定制callBack该怎么做?如果直接在别的地方继续设置会报"Only one ConfirmCallback is supported by each RabbitTemplate"异常,这时候我们就需要将RabbitTemplate的作用域设为@Scope,这样每个bean都是一个新的。难道这样就可以了么?我们的service类一般都是单例的,这意味着当service类生成后,注入的RabbitTemplate就已经不变了,这个就是Single域的bean中注入Scope域bean的问题。一种解决方法是实现ApplicationAware接口注入ApplicationContext,每次使用RabbitTemplate时调用其getBean方法。一个更好的解决方案是使用spring提供的lookup方法。

spring会帮我们代理lookup注解的方法,每次调用都会返回一个全新的bean。但其实平常使用一般都会将发送方单独抽取出来实现回调接口,不会涉及上面的问题,一般都如下配置,注意将template配置成scope即可。

RabbitTemplate可以添加消息转换器,作用就类似于mvc中配置的@ResponseBody消息转换器。

具体如何发送与接收消息感觉不用咋说了。。。就send,receive(x,x,x)这个用IDE看一下方法doc就知道咋用了。receive为拉模式,很少使用,关于接收方法我们更常使用的是异步接收,即推模式,一般使用@RabbitListener 实现

当hello队列中有消息时,方法会自动调用。

像我们平常做web开发,前端想要接受来自后台的消息无非俩个方法,前台请求和后台推送,前台轮询一般就是ajax定时器,推送一般使用WebSocket实现,MQ同样有两种模式:轮询请求队列看是否有消息即拉模式,队列中有消息即对消费者进行通知即推模式。

对于拉模式,Spring提供了receive,receiveAndConvert,和receiveAndReply方法。接收并回复的方法很有用,比如订单系统,下单消息被MQ处理完后再返回消息给其他队列,告诉她这个订单已经完成,可以进行付费操作了。接收并回复调用template.receiveAndReply实现自己的接收回调。对于推模式,项目中基本上使用@RabbitListener注解完成,该注解结合@SendTo注解完成receiveAndReply功能,若没有sendto,这个方法是不允许有返回值的。对于异常情况,配置@RabbitListener的errorHandler和returnExceptions即可。关于@RabbitListener注解的具体使用其实也挺复杂的,推荐直接看文档。使用监听器的过程中消息是默认经过消息转换器的,可以手动为其设置消息转换器。关于RabbitMQ LIstener的配置可以使用Config方式或者SpringBoot的配置文件方式。

上面只是官方文档的一部分,其实除了Listener大部分Config方式的配置都可以用配置文件方式替代。

声明队列与交换机:分为xml方式和Java Config方式(懒得写了,这个基本官网就是复制粘贴)

配置Broker:Spring对其的抽象为RabbitAdmin,也是官网。。

延时队列实现:设置交换机延时属性为true,通过convertAndSend中的MessagePostProcessor实现发送延时消息,这个方法需要安装延时交换机这样的一个插件(也可以通过死信队列实现)

好了。今天就先写这么多,因为实在是写的太乱了,以后有时间整理一下。。。

Spring Boot 2.X - Spring Boot整合AMQP之RabbitMQ

文章目录
Spring Boot 2.X - Spring Boot整合AMQP之RabbitMQ
RabbitMQ简介
引入依赖
编写配置
编写接口
启用Rabbit注解
消息监听
消息测试
Spring Boot 2.X - Spring Boot整合AMQP之RabbitMQ
Spring Boot 2 整合RabbitMQ案例。

RabbitMQ简介
简介
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
核心概念
Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组
成,这些属性包括routing-key(路由键)、 priority(相对于其他消息的优先权)、 delivery-mode(指出
该消息可能需要持久性存储)等。
Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Exchange有4种类型: direct(默认), fanout, topic, 和headers,不同类型的Exchange转发消息的策略有
所区别。
Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息
可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连
接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
Exchange 和Queue的绑定可以是多对多的关系。
Connection
网络连接,比如一个TCP连接。
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚
拟连接, AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这
些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所
以引入了信道的概念,以复用一条 TCP 连接。
Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加
密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有
自己的队列、交换器、绑定和权限机制。 vhost 是 AMQP 概念的基础,必须在连接时指定,
RabbitMQ 默认的 vhost 是 / 。
Broker
表示消息队列服务器实体


引入依赖
利用Spring Initializr快速创建一个Spring Boot项目spring-boot-v2-amqp,主要依赖如下:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

编写配置
添加RabbitMQ的配置信息。RabbbitMQ安装请参阅 Docker安装RabbitMQ

spring:
rabbitmq:
host: 192.168.0.2 # RabbitMQ主机IP
port: 5672 # 默认5672,一致可不写
username: guest # 默认guest,一致可不写
password: guest # 默认guest,一致可不写
# virtual-host: / # 默认是"/",一致可不写,public void setVirtualHost(String virtualHost) {this.virtualHost = "".equals(virtualHost) ? "/": virtualHost;}


编写接口
新建一个AmqpController,用于发送消息

@RestController
public class AmqpController {

private final RabbitTemplate rabbitTemplate;

public AmqpController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

private final static String SUCCESS = "success";

/**
* 单点
* @param msg
* @return
*/
@GetMapping("/direct")
public String direct(String msg){
rabbitTemplate.convertAndSend("amq.direct", "xudc", msg);
return SUCCESS;
}

@GetMapping("/fanout")
public String fanout(String msg) {
rabbitTemplate.convertAndSend("amq.fanout", "", msg);
return SUCCESS;
}

@GetMapping("/topic")
public String topic(String msg){
rabbitTemplate.convertAndSend("amq.topic", "xudc.#", msg);
return SUCCESS;
}
}

启用Rabbit注解
在主启动类上加上注解@EnableRabbit

@SpringBootApplication
@EnableRabbit // 开启基于注解的RabbitMQ模式
public class SpringBootAmqpApplication {

public static void main(String[] args) {
SpringApplication.run(SpringBootAmqpApplication.class, args);
}
}

消息监听
编写消息监听者

@Component
public class AmqpListener {

@RabbitListener(queues = "xudc")
public void receive1(String message) {
System.err.println("xudc -- receive1接收到消息:" + message);
}

@RabbitListener(queues = "xudc.book")
public void receive2(String message) {
System.err.println("xudc.book -- receive2接收到消息:" + message);
}

@RabbitListener(queues = "andy")
public void receive3(String message) {
System.err.println("andy -- receive3接收到消息:" + message);
}
}

消息测试
direct
fanout
topic
至此,实现了Spring Boot和RabbitMQ的简单整合。
————————————————
版权声明:本文为CSDN博主「xudc」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/xudc0521/article/details/89362348

以上是关于Spring AMQP杂记之Spring实现简述的主要内容,如果未能解决你的问题,请参考以下文章

Spring杂记BeanFactory之getBean方法

Spring Boot 2.X - Spring Boot整合AMQP之RabbitMQ

Spring Boot异步消息之AMQP讲解及实战(附源码)

Spring消息之AMQP.

译: 1. RabbitMQ Spring AMQP 之 Hello World

Rabbitmq与spring整合之重要组件介绍——AMQP声明式配置&RabbitTemplate组件