Spring AMQP项目

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring AMQP项目相关的知识,希望对你有一定的参考价值。

Spring

Spring AMQP项目将Spring的核心概念应用于基于AMQP的消息传递解决方案的开发。 我们提供了一个“模板”作为发送和接收消息的高级抽象。 我们还为消息驱动的 POJO 提供支持。 这些库有助于管理 AMQP 资源,同时促进依赖关系注入和声明性配置的使用。 在所有这些情况下,你可以看到与 Spring 框架中的 JMS 支持的相似之处。 有关其他与项目相关的信息,请访问Spring AMQP项目主页。

2. 最新消息

2.1. 自 2.4 以来 3.0 的变化

2.1.1. Java 17, Spring Framework 6.0

此版本需要 Spring Framework 6.0 和 Java 17

2.1.2. 远程处理

不再支持远程处理功能(使用 RMI)。

2.1.3. 观察

现在支持使用千分尺启用计时器观察和跟踪。 有关更多信息,请参阅千分尺观察。

2.1.4. 原生镜像

提供了对创建本机映像的支持。 有关详细信息,请参阅本机映像。

2.1.5. 异步兔子模板

现在返回 s 而不是 s。 有关详细信息,请参阅异步兔子模板​。​​AsyncRabbitTemplate​​​​CompletableFuture​​​​ListenableFuture​

2.1.6. 流支持更改

​RabbitStreamOperations​​​并且方法现在返回而不是 .​​RabbitStreamTemplate​​​​CompletableFuture​​​​ListenableFuture​

现在支持超级流和单个活跃使用者。

有关更多信息,请参阅 使用 RabbitMQ 流插件。

2.1.7. 变更​​@RabbitListener​

批处理侦听器现在可以同时使用 . 批处理消息传递适配器现在可确保该方法适用于使用批处理。 将容器工厂设置为 时,该属性也设置为 。 有关详细信息,请参阅批处理@RabbitListener。​​Collection<?>​​​​List<?>​​​​consumerBatchEnabled​​​​true​​​​batchListener​​​​true​

​MessageConverter​​s 现在可以返回空值;这目前由 . 有关详细信息,请参阅从消息转换​​Optional.empty()​​​​Jackson2JsonMessageConverter​

现在,您可以通过容器工厂而不是通过 上的属性配置 。 有关详细信息,请参阅回复管理。​​ReplyPostProcessor​​​​@RabbitListener​

2.1.8. 连接工厂变更

中的默认值现在为 。这会导致在提供多个地址时连接到随机主机。 有关详细信息,请参阅连接到集群。​​addressShuffleMode​​​​AbstractConnectionFactory​​​​RANDOM​

不再使用 RabbitMQ 库来确定哪个节点是队列的领导者。 有关详细信息,请参阅队列相关性和 LocalizedQueueConnectionFactory。​​LocalizedQueueConnectionFactory​​​​http-client​

3. 简介

参考文档的第一部分是对Spring AMQP和基本概念的高级概述。 它包括一些代码片段,可帮助您尽快启动并运行。

3.1. 不耐烦的人快速浏览

3.1.1. 简介

这是开始春季AMQP的五分钟导览。

先决条件:安装并运行 RabbitMQ 代理 (https://www.rabbitmq.com/download.html)。 然后获取 spring-rabbit JAR 及其所有依赖项 - 最简单的方法是在构建工具中声明依赖项。 例如,对于 Maven,您可以执行类似于以下内容的操作:

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>3.0.0</version>
</dependency>

对于 Gradle,您可以执行类似以下步骤的操作:

compile org.springframework.amqp:spring-rabbit:3.0.0
兼容性

Spring 框架版本的最低依赖关系是 5.2.0。

Java 客户机库的最低版本为 5.7.0。​​amqp-client​

流队列的最小 Java 客户机库为 0.7.0。​​stream-client​

非常非常快

本节提供最快的介绍。

首先,添加以下语句以使本节后面的示例正常工作:​​import​

以下示例使用普通的命令式 Java 发送和接收消息:

ConnectionFactory connectionFactory = new CachingConnectionFactory();
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareQueue(new Queue("myqueue"));
AmqpTemplate template = new RabbitTemplate(connectionFactory);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");

请注意,原生 Java Rabbit 客户端中还有一个。 我们在前面的代码中使用了 Spring 抽象。 它缓存通道(以及可选的连接)以供重用。 我们依赖于代理中的默认交换(因为在发送中没有指定任何交换),以及所有队列通过其名称默认绑定到默认交换(因此,我们可以在发送中使用队列名称作为路由密钥)。 这些行为在 AMQP 规范中定义。​​ConnectionFactory​

使用 XML 配置

以下示例与前面的示例相同,但将资源配置外部化为 XML:

ApplicationContext context =
new GenericXmlApplicationContext("classpath:/rabbit-context.xml");
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd">

<rabbit:connection-factory id="connectionFactory"/>

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>

<rabbit:admin connection-factory="connectionFactory"/>

<rabbit:queue name="myqueue"/>

</beans>

缺省情况下,声明会自动查找 、 的 bean,并代表用户将它们声明给代理。 因此,您不需要在简单的 Java 驱动程序中显式使用该 Bean。 有很多选项可用于配置 XML 架构中组件的属性。 您可以使用 XML 编辑器的自动完成功能来浏览它们并查看其文档。​​<rabbit:admin/>​​​​Queue​​​​Exchange​​​​Binding​

使用爪哒配置

以下示例重复与前面的示例相同的示例,但使用了 Java 中定义的外部配置:

ApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
AmqpTemplate template = context.getBean(AmqpTemplate.class);
template.convertAndSend("myqueue", "foo");
String foo = (String) template.receiveAndConvert("myqueue");

........

@Configuration
public class RabbitConfiguration

@Bean
public CachingConnectionFactory connectionFactory()
return new CachingConnectionFactory("localhost");


@Bean
public RabbitAdmin amqpAdmin()
return new RabbitAdmin(connectionFactory());


@Bean
public RabbitTemplate rabbitTemplate()
return new RabbitTemplate(connectionFactory());


@Bean
public Queue myQueue()
return new Queue("myqueue");

具有 Spring 引导自动配置和异步 POJO 侦听器

Spring 引导会自动配置基础结构 bean,如以下示例所示:

@SpringBootApplication
public class Application

public static void main(String[] args)
SpringApplication.run(Application.class, args);


@Bean
public ApplicationRunner runner(AmqpTemplate template)
return args -> template.convertAndSend("myqueue", "foo");


@Bean
public Queue myQueue()
return new Queue("myqueue");


@RabbitListener(queues = "myqueue")
public void listen(String in)
System.out.println(in);


4. 参考资料

参考文档的这一部分详细介绍了构成Spring AMQP的各种组件。 主要章节介绍了开发 AMQP 应用程序的核心类。 本部分还包括有关示例应用程序的章节。

4.1. 使用弹簧 AMQP

本章探讨了使用 Spring AMQP 开发应用程序的基本组件接口和类。

4.1.1. AMQP 抽象

Spring AMQP 由两个模块组成(每个模块在发行版中由一个 JAR 表示):和 。 “spring-amqp”模块包含该软件包。 在该包中,您可以找到表示核心 AMQP“模型”的类。 我们的目的是提供不依赖于任何特定 AMQP 代理实现或客户端库的通用抽象。 最终用户代码可以跨供应商实现更具可移植性,因为它可以仅针对抽象层进行开发。 然后,这些抽象由特定于代理的模块实现,例如“spring-rabbit”。 目前只有一个 RabbitMQ 实现。 但是,除了RabbitMQ之外,这些抽象已经在.NET中使用Apache Qpid进行了验证。 由于 AMQP 在协议级别运行,原则上可以将 RabbitMQ 客户端与任何支持相同协议版本的代理一起使用,但我们目前不测试任何其他代理。​​spring-amqp​​​​spring-rabbit​​​​org.springframework.amqp.core​

本概述假定您已经熟悉 AMQP 规范的基础知识。 如果没有,请查看其他资源中列出的资源

​Message​

0-9-1 AMQP 规范未定义类或接口。 相反,在执行诸如 之类的操作时,内容将作为字节数组参数传递,其他属性将作为单独的参数传入。 Spring AMQP将类定义为更通用的AMQP域模型表示的一部分。 该类的目的是将主体和属性封装在单个实例中,以便 API 可以反过来更简单。 下面的示例演示类定义:​​Message​​​​basicPublish()​​​​Message​​​​Message​​​​Message​

public class Message 

private final MessageProperties messageProperties;

private final byte[] body;

public Message(byte[] body, MessageProperties messageProperties)
this.body = body;
this.messageProperties = messageProperties;


public byte[] getBody()
return this.body;


public MessageProperties getMessageProperties()
return this.messageProperties;

该接口定义了几个常见属性,例如“messageId”、“timestamp”、“内容类型”等。 您还可以通过调用该方法,使用用户定义的“标头”扩展这些属性。​​MessageProperties​​​​setHeader(String key, Object value)​

从版本 、、 和 开始,如果消息正文是序列化的 Java 对象,则在执行操作(例如在日志消息中)时不再反序列化(缺省情况下)。 这是为了防止不安全的反序列化。 默认情况下,只有 和 类被反序列化。 要恢复到以前的行为,可以通过调用 来添加允许的类/包模式。 支持简单的通配符,例如​​com.something。​ 无法反序列化的主体在日志消息中表示。​​1.5.7​​​​1.6.11​​​​1.7.4​​​​2.0.0​​​​Serializable​​​​toString()​​​​java.util​​​​java.lang​​​​Message.addAllowedListPatterns(…)​​​​, *.MyClass​​​​byte[<size>]​

交换

该接口表示 AMQP 交换,即消息生产者发送到的内容。 经纪商虚拟主机中的每个交易所都有一个唯一的名称以及一些其他属性。 以下示例显示了该接口:​​Exchange​​​​Exchange​

public interface Exchange 

String getName();

String getExchangeType();

boolean isDurable();

boolean isAutoDelete();

Map<String, Object> getArguments();

如您所见,an 也有一个由 中定义的常量表示的“类型”。 基本类型包括:、、 和 。 在核心包中,您可以找到每种类型的接口实现。 这些类型的行为在如何处理队列绑定方面有所不同。 例如,交换允许队列由固定路由密钥(通常是队列的名称)绑定。 交换支持具有路由模式的绑定,这些模式可能分别包含“*”和“#”通配符,分别表示“恰好一个”和“零个或多个”。 交易所发布到绑定到它的所有队列,而不考虑任何路由密钥。 有关这些和其他 Exchange 类型的详细信息,请参阅其他资源。​​Exchange​​​​ExchangeTypes​​​​direct​​​​topic​​​​fanout​​​​headers​​​​Exchange​​​​Exchange​​​​Direct​​​​Topic​​​​Fanout​

AMQP规范还要求任何经纪商提供没有名称的“默认”直接交易所。 声明的所有队列都绑定到该默认值,其名称作为路由密钥。 您可以在 AmqpTemplate​ 中了解有关默认 Exchange 在 Spring AMQP 中的用法的更多信息。​​Exchange​

队列

该类表示消息使用者从中接收消息的组件。 与各种类一样,我们的实现旨在成为此核心 AMQP 类型的抽象表示。 以下清单显示了该类:​​Queue​​​​Exchange​​​​Queue​

public class Queue  

private final String name;

private volatile boolean durable;

private volatile boolean exclusive;

private volatile boolean autoDelete;

private volatile Map<String, Object> arguments;

/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name)
this(name, true, false, false);


// Getters and Setters omitted for brevity

请注意,构造函数采用队列名称。 根据实现,管理模板可能会提供用于生成唯一命名队列的方法。 此类队列可用作“回复”地址或其他临时情况。 因此,自动生成队列的“独占”和“自动删除”属性都将设置为“true”。

有关使用命名空间支持(包括队列参数)声明队列的信息,请参阅配置代理中有关队列的部分。

捆绑

假设生产者发送到交易所,消费者从队列接收,将队列连接到交易所的绑定对于通过消息传递连接这些生产者和消费者至关重要。 在Spring AMQP中,我们定义了一个类来表示这些连接。 本节回顾将队列绑定到交易所的基本选项。​​Binding​

您可以使用固定路由密钥将队列绑定到 ,如以下示例所示:​​DirectExchange​

new Binding(someQueue, someDirectExchange, "foo.bar");

您可以使用路由模式将队列绑定到 ,如以下示例所示:​​TopicExchange​

new Binding(someQueue, someTopicExchange, "foo.*");

您可以将队列绑定到没有路由密钥的 ,如以下示例所示:​​FanoutExchange​

new Binding(someQueue, someFanoutExchange);

我们还提供了一个促进“流畅 API”样式,如以下示例所示:​​BindingBuilder​

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");

为清楚起见,前面的示例显示了该类,但是当对“bind()”方法使用静态导入时,此样式效果很好。​​BindingBuilder​

就其本身而言,类的实例仅保存有关连接的数据。 换句话说,它不是一个“活动”组件。 但是,正如稍后将在配置代理中看到的那样,该类可以使用实例来实际触发代理上的绑定操作。 此外,正如您在同一部分中所看到的,您可以通过在类中使用 Spring 的注释来定义实例。 还有一个方便的基类,它进一步简化了生成与 AMQP 相关的 Bean 定义的方法,并识别队列、交换和绑定,以便在应用程序启动时在 AMQP 代理上声明它们。​​Binding​​​​AmqpAdmin​​​​Binding​​​​Binding​​​​@Bean​​​​@Configuration​

也在核心包中定义。 作为实际 AMQP 消息传递中涉及的主要组件之一,它在其自己的部分中进行了详细讨论(请参阅 AmqpTemplate)。​​AmqpTemplate​

4.1.2. 连接和资源管理

虽然我们在上一节中描述的 AMQP 模型是通用的并且适用于所有实现,但当我们进入资源管理时,细节特定于代理实现。 因此,在本节中,我们将重点介绍仅存在于“spring-rabbit”模块中的代码,因为此时,RabbitMQ 是唯一受支持的实现。

管理与 RabbitMQ 代理的连接的核心组件是接口。 实现的职责是提供 的实例,该实例是 的包装器。​​ConnectionFactory​​​​ConnectionFactory​​​​org.springframework.amqp.rabbit.connection.Connection​​​​com.rabbitmq.client.Connection​

选择连接工厂

有三家连接工厂可供选择

  • ​PooledChannelConnectionFactory​
  • ​ThreadChannelConnectionFactory​
  • ​CachingConnectionFactory​

前两个是在 2.3 版中添加的。

对于大多数用例,应使用 。 如果要确保严格的消息排序而不需要使用作用域内操作,则可以使用 。 如果要使用相关的发布者确认,或者希望通过其打开多个连接,则应使用 。​​PooledChannelConnectionFactory​​​​ThreadChannelConnectionFactory​​​​CachingConnectionFactory​​​​CacheMode​

所有三个工厂都支持简单的发布者确认。

将 配置为使用单独的连接时,现在可以从版本 2.3.2 开始,将发布连接工厂配置为其他类型。 默认情况下,发布工厂的类型相同,在主工厂上设置的任何属性也会传播到发布工厂。​​RabbitTemplate​

​PooledChannelConnectionFactory​

此工厂基于 Apache Pool2 管理单个连接和两个通道池。 一个池用于事务通道,另一个池用于非事务通道。 池是默认配置的;提供回调以配置池;有关更多信息,请参阅 Apache 文档。​​GenericObjectPool​

Apache jar 必须位于类路径上才能使用此工厂。​​commons-pool2​

@Bean
PooledChannelConnectionFactory pcf() throws Exception
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) ->
if (tx)
// configure the transactional pool

else
// configure the non-transactional pool

);
return pcf;
​ThreadChannelConnectionFactory​

此工厂管理单个连接和两个连接,一个用于事务通道,另一个用于非事务通道。 此工厂确保同一线程上的所有操作都使用相同的通道(只要它保持打开状态)。 这有助于严格的消息排序,而无需作用域内操作。 为避免内存泄漏,如果应用程序使用许多生存期较短的线程,则必须调用工厂的线程来释放通道资源。 从版本 2.3.7 开始,线程可以将其通道传输到另一个线程。 有关详细信息,请参阅多线程环境中的严格消息排序。​​ThreadLocal​​​​closeThreadChannel()​

​CachingConnectionFactory​

提供的第三个实现是 ,默认情况下,它建立可由应用程序共享的单个连接代理。 共享连接是可能的,因为使用 AMQP 进行消息传递的“工作单元”实际上是一个“通道”(在某些方面,这类似于 JMS 中的连接和会话之间的关系)。 连接实例提供了一个方法。 该实现支持缓存这些通道,并根据通道是否为事务性为通道维护单独的缓存。 创建 的实例时,可以通过构造函数提供“主机名”。 您还应该提供“用户名”和“密码”属性。 若要配置通道缓存的大小(默认值为 25),可以调用该方法。​​CachingConnectionFactory​​​​createChannel​​​​CachingConnectionFactory​​​​CachingConnectionFactory​​​​setChannelCacheSize()​

从版本 1.3 开始,您可以配置缓存连接以及仅通道。 在这种情况下,每次调用 都会创建一个新连接(或从缓存中检索空闲连接)。 关闭连接会将其返回到缓存(如果尚未达到缓存大小)。 在此类连接上创建的通道也会被缓存。 在某些环境中使用单独的连接可能很有用,例如从 HA 群集使用,在 与负载均衡器配合使用,以连接到不同的集群成员等。 要缓存连接,请将 设置为 。​​CachingConnectionFactory​​​​createConnection()​​​​cacheMode​​​​CacheMode.CONNECTION​

这不会限制连接数。 相反,它指定允许多少个空闲的打开连接。

从版本 1.5.5 开始,提供了一个名为的新属性。 设置此属性时,它将限制允许的连接总数。 设置后,如果达到限制,则用于等待连接变为空闲状态。 如果超过该时间,则抛出 。​​connectionLimit​​​​channelCheckoutTimeLimit​​​​AmqpTimeoutException​


当缓存模式为 时,自动声明队列和其他 (请参阅交换、队列和绑定的自动声明​)不受支持。​​CONNECTION​



此外,在撰写本文时,库默认为每个连接创建一个固定的线程池(默认大小:线程)。 使用大量连接时,应考虑在 上设置自定义。 然后,所有连接都可以使用相同的执行器,并且可以共享其线程。 执行程序的线程池应不受限制或针对预期用途进行适当设置(通常,每个连接至少一个线程)。 如果在每个连接上创建了多个通道,则池大小会影响并发性,因此变量(或简单缓存)线程池执行器将是最合适的。​​amqp-client​​​​Runtime.getRuntime().availableProcessors() * 2​​​​executor​​​​CachingConnectionFactory​


重要的是要了解缓存大小(默认情况下)不是限制,而只是可以缓存的通道数。 如果缓存大小为 10,则实际上可以使用任意数量的通道。 如果使用的通道超过 10 个,并且它们都返回到缓存中,则缓存中将有 10 个通道。 其余部分是物理关闭的。

从版本 1.6 开始,默认通道缓存大小已从 1 增加到 25。 在高容量、多线程环境中,小缓存意味着以高速率创建和关闭通道。 增加默认缓存大小可以避免此开销。 您应该通过 RabbitMQ 管理 UI 监控正在使用的通道,并考虑进一步增加缓存大小,如果您 看到许多频道正在创建和关闭。 缓存仅按需增长(以满足应用程序的并发要求),因此此更改不会 影响现有的小批量应用程序。

从版本 1.4.2 开始,具有一个名为 的属性。 当此属性大于零时,将限制可在连接上创建的通道数。 如果达到限制,调用线程将阻塞,直到通道可用或达到此超时,在这种情况下,将抛出 a。​​CachingConnectionFactory​​​​channelCheckoutTimeout​​​​channelCacheSize​​​​AmqpTimeoutException​

框架内使用的通道(例如,)将可靠地返回到缓存中。 如果在框架外部创建通道,(例如, 通过直接访问连接并调用 ),您必须可靠地(通过关闭)返回它们,也许在一个块中,以避免通道不足。​​RabbitTemplate​​​​createChannel()​​​​finally​

以下示例演示如何创建新的 :​​connection​

CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection connection = connectionFactory.createConnection();

使用 XML 时,配置可能类似于以下示例:

<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>

还有一个仅在框架的单元测试代码中可用的实现。 它比 更简单,因为它不缓存通道,但由于缺乏性能和弹性,它不适合在简单测试之外的实际使用。 如果出于某种原因需要实现自己的基类,基类可能会提供一个很好的起点。​​SingleConnectionFactory​​​​CachingConnectionFactory​​​​ConnectionFactory​​​​AbstractConnectionFactory​

通过使用 rabbit 命名空间可以快速方便地创建 A,如下所示:​​ConnectionFactory​

<rabbit:connection-factory id="connectionFactory"/>

在大多数情况下,此方法更可取,因为框架可以为您选择最佳默认值。 创建的实例是 . 请记住,通道的默认缓存大小为 25。 如果要缓存更多通道,请通过设置“channelCacheSize”属性来设置更大的值。 在 XML 中,它如下所示:​​CachingConnectionFactory​

<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>

此外,使用命名空间,您可以添加“channel-cache-size”属性,如下所示:

<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>

默认缓存模式为 ,但您可以将其配置为缓存连接。 在下面的示例中,我们使用:​​CHANNEL​​​​connection-cache-size​

<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>

可以使用命名空间提供主机和端口属性,如下所示:

<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>

或者,如果在群集环境中运行,则可以使用 addresses 属性,如下所示:

<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>

有关 的信息,请参阅连接到群集。​​address-shuffle-mode​

以下示例具有自定义线程工厂,该工厂的线程名称前缀为 :​​rabbitmq-​

<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />

<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
地址解析器

从版本 2.1.15 开始,您现在可以使用 解析连接地址。 这将覆盖 和 属性的任何设置。​​AddressResolver​​​​addresses​​​​host/port​

命名连接

从版本 1.7 开始,提供了用于注入 . 生成的名称用于目标 RabbitMQ 连接的特定于应用程序的标识。 如果 RabbitMQ 服务器支持连接名称,则连接名称将显示在管理 UI 中。 此值不必是唯一的,也不能用作连接标识符,例如,在 HTTP API 请求中。 此值应该是人类可读的,并且是键下的一部分。 您可以使用简单的 Lambda,如下所示:​​ConnectionNameStrategy​​​​AbstractionConnectionFactory​​​​ClientProperties​​​​connection_name​

connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");

该参数可用于通过某些逻辑区分目标连接名称。 默认情况下,的 、表示对象的十六进制字符串和内部计数器用于生成 . 命名空间组件也随属性一起提供。​​ConnectionFactory​​​​beanName​​​​AbstractConnectionFactory​​​​connection_name​​​​<rabbit:connection-factory>​​​​connection-name-strategy​

的实现将连接名称设置为应用程序属性。 可以将其声明为 并将其注入到连接工厂中,如以下示例所示:​​SimplePropertyValueConnectionNameStrategy​​​​@Bean​

@Bean
public SimplePropertyValueConnectionNameStrategy cns()
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");


@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns)
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;

该属性必须存在于应用程序上下文的 .​​Environment​

使用 Spring 引导及其自动配置的连接工厂时,只需声明 . 引导自动检测 bean 并将其连接到工厂。​​ConnectionNameStrategy​​​​@Bean​

阻止的连接和资源限制

连接可能会被阻止,无法与内存警报对应的代理进行交互。 从版本 2.0 开始,可以为 提供要通知连接阻止和取消阻止事件的实例。 此外,分别通过其内部实现发出 a 和 。 这些允许您提供应用程序逻辑,以便对代理上的问题做出适当的反应,并(例如)采取一些纠正措施。​​org.springframework.amqp.rabbit.connection.Connection​​​​com.rabbitmq.client.BlockedListener​​​​AbstractConnectionFactory​​​​ConnectionBlockedEvent​​​​ConnectionUnblockedEvent​​​​BlockedListener​

当应用程序配置了单个 ,就像默认情况下使用 Spring 引导自动配置一样,当连接被代理阻止时,应用程序将停止工作。 当它被经纪人阻止时,它的任何客户都会停止工作。 如果我们在同一个应用程序中有生产者和消费者,当生产者阻止连接(因为代理上不再有资源)并且消费者无法释放它们(因为连接被阻止)时,我们最终可能会

以上是关于Spring AMQP项目的主要内容,如果未能解决你的问题,请参考以下文章

Spring AMQP Java 客户端中的队列大小

Spring使用Spring和AMQP发送接收消息(上)

Spring AMQP 错误处理策略详解

spring-boot-starter-amqp 依赖导致 ***Error

SpringBoot整合rabbitMQ,spring-boot-starter-amqp 的使用

040 RabbitMq及数据同步02

(c)2006-2024 SYSTEM All Rights Reserved IT常识