Spring Integration对Redis的支持

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Integration对Redis的支持相关的知识,希望对你有一定的参考价值。

Spring

Spring Integration 2.1引入了对Redis​的支持:“一个开源的高级键值存储”。 这种支持以基于 Redis 以及发布-订阅消息传递适配器的形式出现,Redis 通过其 PUBLISH、SUBSCRIBE和 UNSUBSCRIBE​命令支持这些适配器。​​MessageStore​

您需要将此依赖项包含在项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>6.0.0</version>
</dependency>

您还需要包含 Redis 客户端依赖项,例如 Lettuce。

要下载、安装和运行 Redis,请参阅 Redis 文档。

连接到 Redis

要开始与 Redis 交互,您首先需要连接到它。 Spring Integration 使用另一个 Spring 项目 Spring Data Redis 提供的支持,该项目提供了典型的 Spring 构造:和 。 这些抽象简化了与多个 Redis 客户端 Java API 的集成。 目前,Spring Data Redis 支持 Jedis 和 Lettuce。​​ConnectionFactory​​​​Template​

用​​RedisConnectionFactory​

要连接到 Redis,您可以使用接口的实现之一。 以下清单显示了接口定义:​​RedisConnectionFactory​

public interface RedisConnectionFactory extends PersistenceExceptionTranslator 

/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();

以下示例显示了如何在 Java 中创建:​​LettuceConnectionFactory​

LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();

下面的示例展示了如何在 Spring 的 XML 配置中创建:​​LettuceConnectionFactory​

<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>

的实现提供了一组属性,例如端口和主机,您可以根据需要设置这些属性。 拥有 的实例后,可以创建 的实例并将其注入 .​​RedisConnectionFactory​​​​RedisConnectionFactory​​​​RedisTemplate​​​​RedisConnectionFactory​

用​​RedisTemplate​

与 Spring 中的其他模板类(例如 和 )一样,它是一个简化 Redis 数据访问代码的辅助类。 有关及其变体的更多信息(例如),请参阅 Spring Data Redis 文档。​​JdbcTemplate​​​​JmsTemplate​​​​RedisTemplate​​​​RedisTemplate​​​​StringRedisTemplate​

以下示例演示如何在 Java 中创建 的实例:​​RedisTemplate​

RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);

下面的示例展示了如何在 Spring 的 XML 配置中创建 的实例:​​RedisTemplate​

<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

使用 Redis 发送消息

如简介中所述,Redis 通过其 、 和命令提供对发布-订阅消息传递的支持。 与JMS和AMQP一样,Spring Integration提供了消息通道和适配器,用于通过Redis发送和接收消息。​​PUBLISH​​​​SUBSCRIBE​​​​UNSUBSCRIBE​

瑞迪斯发布/订阅频道

与 JMS 类似,在某些情况下,生产者和使用者都打算成为同一应用程序的一部分,在同一进程中运行。 可以使用一对入站和出站通道适配器来实现此目的。 但是,与Spring Integration的JMS支持一样,有一种更简单的方法来解决这个用例。 您可以创建发布-订阅通道,如以下示例所示:

<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>

A 的行为很像 Spring 集成主命名空间中的普通元素。 它可以由任何终结点的 和属性引用。 不同之处在于,此通道由 Redis 主题名称支持:属性指定的值。 但是,与 JMS 不同的是,本主题不必提前创建,甚至不必由 Redis 自动创建。 在 Redis 中,主题是扮演地址角色的简单值。 生成者和使用者可以使用与其主题名称相同的值进行通信。 对此通道的简单订阅意味着可以在生产和消费终结点之间实现异步发布-订阅消息传递。 但是,与通过在简单的 Spring 集成元素中添加元素创建的异步消息通道不同,消息不会存储在内存中队列中。 相反,这些消息通过 Redis 传递,这使您可以依赖它对持久性和集群的支持以及与其他非 Java 平台的互操作性。​​publish-subscribe-channel​​​​<publish-subscribe-channel/>​​​​input-channel​​​​output-channel​​​​String​​​​topic-name​​​​String​​​​String​​​​<queue/>​​​​<channel/>​

Redis 入站通道适配器

Redis 入站通道适配器 () 以与其他入站适配器相同的方式将传入的 Redis 消息调整为 Spring 消息。 它接收特定于平台的消息(在本例中为 Redis),并使用策略将它们转换为 Spring 消息。 以下示例演示如何配置 Redis 入站通道适配器:​​RedisInboundChannelAdapter​​​​MessageConverter​

<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />

<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

前面的示例显示了 Redis 入站通道适配器的简单但完整的配置。 请注意,前面的配置依赖于熟悉的 Spring 范式,即自动发现某些 bean。 在这种情况下,将隐式注入适配器。 可以改用属性显式指定它。​​redisConnectionFactory​​​​connection-factory​

另请注意,上述配置为适配器注入了自定义 . 该方法类似于 JMS,其中实例用于在 Redis 消息和 Spring 集成消息有效负载之间进行转换。 默认值为 .​​MessageConverter​​​​MessageConverter​​​​SimpleMessageConverter​

入站适配器可以订阅多个主题名称,因此属性中的值集以逗号分隔。​​topics​

从 V3.0 开始,除了现有属性之外,入站适配器现在还具有该属性。 此属性包含一组以逗号分隔的 Redis 主题模式。 有关 Redis 发布-订阅的更多信息,请参阅 Redis 发布/订阅。​​topics​​​​topic-patterns​

入站适配器可以使用 对 Redis 消息正文进行反序列化。 的属性可以设置为空字符串,这将生成属性的值。 在这种情况下,Redis 消息的原始正文作为消息有效负载提供。​​RedisSerializer​​​​serializer​​​​<int-redis:inbound-channel-adapter>​​​​null​​​​RedisSerializer​​​​byte[]​

从 V5.0 开始,您可以使用 的属性向入站适配器提供实例。 此外,收到的 Spring 集成消息现在具有指示已发布消息来源的标头:主题或模式。 您可以将此下游用于路由逻辑。​​Executor​​​​task-executor​​​​<int-redis:inbound-channel-adapter>​​​​RedisHeaders.MESSAGE_SOURCE​

Redis 出站通道适配器

Redis 出站通道适配器将传出的 Spring 集成消息调整为 Redis 消息,方式与其他出站适配器相同。 它接收 Spring 集成消息,并使用策略将它们转换为特定于平台的消息(在本例中为 Redis)。 以下示例演示如何配置 Redis 出站通道适配器:​​MessageConverter​

<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>

<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

该配置与 Redis 入站通道适配器并行。 适配器隐式注入 ,该 定义为其 Bean 名称。 此示例还包括可选(和自定义)(bean)。​​RedisConnectionFactory​​​​redisConnectionFactory​​​​MessageConverter​​​​testConverter​

从 Spring Integration 3.0 开始,提供了该属性的替代方法:您可以使用该属性在运行时确定消息的 Redis 主题。 这些属性是互斥的。​​<int-redis:outbound-channel-adapter>​​​​topic​​​​topic-expression​

Redis 队列入站通道适配器

Spring Integration 3.0引入了一个队列入站通道适配器,用于从Redis列表中“弹出”消息。 默认情况下,它使用“右弹出”,但您可以将其配置为使用“左弹出”。 适配器是消息驱动的。 它使用内部侦听器线程,不使用轮询器。

以下清单显示了 的所有可用属性:​​queue-inbound-channel-adapter​

<int-redis:queue-inbound-channel-adapter id=""  
channel=""
auto-startup=""
phase=""
connection-factory=""
queue=""
error-channel=""
serializer=""
receive-timeout=""
recovery-interval=""
expect-message=""
task-executor=""
right-pop=""/>

组件 Bean 名称。 如果未提供该属性,则会在应用程序上下文中创建并注册 ,并将此属性作为 Bean 名称。 在这种情况下,端点本身使用 Bean 名称加 . (如果 Bean 名称为 ,则端点注册为 。​​channel​​​​DirectChannel​​​​id​​​​id​​​​.adapter​​​​thing1​​​​thing1.adapter​

要从此终端节点向其发送实例的实例。​​MessageChannel​​​​Message​

一个属性,用于指定此终结点是否应在应用程序上下文启动后自动启动。 默认为 .​​SmartLifecycle​​​​true​

一个属性,用于指定启动此终结点的阶段。 默认为 .​​SmartLifecycle​​​​0​

对 Bean 的引用。 默认为 .​​RedisConnectionFactory​​​​redisConnectionFactory​

执行基于队列的“pop”操作以获取 Redis 消息的 Redis 列表的名称。

从终端节点的侦听任务收到异常时向其发送实例的实例。 默认情况下,基础使用应用程序上下文中的默认值。​​MessageChannel​​​​ErrorMessage​​​​MessagePublishingErrorHandler​​​​errorChannel​

豆引用。 它可以是一个空字符串,这意味着“没有序列化程序”。 在这种情况下,来自入站 Redis 消息的原始消息将作为有效负载发送到 。 默认情况下,它是一个 .​​RedisSerializer​​​​byte[]​​​​channel​​​​Message​​​​JdkSerializationRedisSerializer​

“pop”操作等待队列中的 Redis 消息的超时(以毫秒为单位)。 默认值为 1 秒。

侦听器任务在重新启动侦听器任务之前,在“pop”操作出现异常后应休眠的时间(以毫秒为单位)。

指定此终端节点是否期望 Redis 队列中的数据包含整个实例。 如果此属性设置为 ,则不能为空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。 其默认值为 。​​Message​​​​true​​​​serializer​​​​false​

对 Spring (或标准 JDK 1.5+) bean 的引用。 它用于基础侦听任务。 它默认为 .​​TaskExecutor​​​​Executor​​​​SimpleAsyncTaskExecutor​

指定此端点应使用“右弹出”(当)还是“左弹出”(当)从 Redis 列表中读取消息。 如果 ,则 Redis 列表在与默认 Redis 队列出站通道适配器一起使用时充当队列。 将其设置为与通过“右推”写入列表的软件一起使用,或实现类似堆栈的消息顺序。 其默认值为 。 从版本 4.3 开始。​​true​​​​false​​​​true​​​​FIFO​​​​false​​​​true​

必须配置多个线程进行处理;否则,当 在出现错误后尝试重新启动侦听器任务时,可能会出现死锁。 可用于处理这些错误,以避免重新启动,但最好不要将应用程序暴露在可能的死锁情况下。 有关可能的实现,请参阅 Spring 框架参考手册​。​​task-executor​​​​RedisQueueMessageDrivenEndpoint​​​​errorChannel​​​​TaskExecutor​

Redis 队列出站通道适配器

Spring Integration 3.0引入了一个队列出站通道适配器,用于从Spring Integration消息“推送”到Redis列表。 默认情况下,它使用“左推”,但您可以将其配置为使用“右推”。 以下清单显示了 Redis 的所有可用属性:​​queue-outbound-channel-adapter​

<int-redis:queue-outbound-channel-adapter id=""  
channel=""
connection-factory=""
queue=""
queue-expression=""
serializer=""
extract-payload=""
left-push=""/>


组件 Bean 名称。 如果未提供该属性,则会在应用程序上下文中创建并注册 ,并将此属性作为 Bean 名称。 在这种情况下,端点注册的 Bean 名称为 加 。 (如果 Bean 名称为 ,则端点注册为 。​​channel​​​​DirectChannel​​​​id​​​​id​​​​.adapter​​​​thing1​​​​thing1.adapter​


此终结点从中接收实例的 。​​MessageChannel​​​​Message​


对 Bean 的引用。 默认为 .​​RedisConnectionFactory​​​​redisConnectionFactory​


执行基于队列的“推送”操作以发送 Redis 消息的 Redis 列表的名称。 此属性与 互斥。​​queue-expression​


用于确定 Redis 列表名称的 SpEL。 它使用运行时的传入作为变量。 此属性与 互斥。​​Expression​​​​Message​​​​#root​​​​queue​


豆类引用。 它默认为 . 但是,对于有效负载,如果未提供引用,则使用 a。​​RedisSerializer​​​​JdkSerializationRedisSerializer​​​​String​​​​StringRedisSerializer​​​​serializer​


指定此终端节点是应仅将有效负载发送还是将整个负载发送到 Redis 队列。 默认为 .​​Message​​​​true​


指定此端点应使用“左推”(当)还是“右推”(当)将消息写入 Redis 列表。 如果为 ,则 Redis 列表在与默认 Redis 队列入站通道适配器一起使用时充当队列。 将其设置为与以“左弹出”从列表中读取的软件一起使用,或实现类似堆栈的消息顺序。 默认为 . 从版本 4.3 开始。​​true​​​​false​​​​true​​​​FIFO​​​​false​​​​true​

瑞迪斯应用程序事件

从 Spring Integration 3.0 开始,Redis 模块提供了 的实现,而 又是一个 . 封装了 Redis 操作的异常(端点是事件的“源”)。 例如,在捕获操作中的异常后发出这些事件。 例外可以是任何泛型或 . 使用 处理这些事件对于确定后台 Redis 任务的问题和采取管理操作非常有用。​​IntegrationEvent​​​​org.springframework.context.ApplicationEvent​​​​RedisExceptionEvent​​​​<int-redis:queue-inbound-channel-adapter/>​​​​BoundListOperations.rightPop​​​​org.springframework.data.redis.RedisSystemException​​​​org.springframework.data.redis.RedisConnectionFailureException​​​​<int-event:inbound-channel-adapter/>​

红与红消息存储

企业集成模式 (EIP) 一书中所述,邮件存储允许您保留消息。 当考虑可靠性时,这在处理具有缓冲消息功能的组件(聚合器、重新排序器等)时非常有用。 在 Spring 集成中,该策略还为声明检查模式提供了基础,EIP 中也有描述。​​MessageStore​

Spring 集成的 Redis 模块提供了 . 以下示例演示如何将其与聚合器一起使用:​​RedisMessageStore​

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>

前面的示例是一个 Bean 配置,它需要 a 作为构造函数参数。​​RedisConnectionFactory​

缺省情况下,使用 Java 序列化来序列化消息。 但是,如果要使用其他序列化技术(如 JSON),可以通过设置 .​​RedisMessageStore​​​​valueSerializer​​​​RedisMessageStore​

从版本 4.3.10 开始,框架分别为实例和实例提供了 Jackson 序列化程序和解序列化程序实现 — 和 。 必须使用的选项配置它们。 此外,还应设置为每个序列化的复杂对象添加类型信息(如果您信任源)。 然后在反序列化期间使用该类型信息。 该框架提供了一个名为 的实用程序方法,该方法已随前面提到的所有属性和序列化程序一起提供。 此实用程序方法附带一个参数,用于限制 Java 包进行反序列化以避免安全漏洞。 默认的受信任软件包:、、。 若要在 中管理 JSON 序列化,必须以类似于以下示例的方式对其进行配置:​​Message​​​​MessageHeaders​​​​MessageJacksonDeserializer​​​​MessageHeadersJacksonSerializer​​​​SimpleModule​​​​ObjectMapper​​​​enableDefaultTyping​​​​ObjectMapper​​​​JacksonJsonUtils.messagingAwareMapper()​​​​trustedPackages​​​​java.util​​​​java.lang​​​​org.springframework.messaging.support​​​​org.springframework.integration.support​​​​org.springframework.integration.message​​​​org.springframework.integration.store​​​​RedisMessageStore​

RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);

从版本 4.3.12 开始,支持允许区分同一 Redis 服务器上的存储实例的选项。​​RedisMessageStore​​​​prefix​

Redis 通道消息存储库

前面显示的内容将每个组维护为单个键(组 ID)下的值。 虽然您可以使用它来支持持久性,但为此目的提供了一个专用的(从版本 4.0 开始)。 此存储在每个通道、发送消息和接收消息时使用 。 默认情况下,此存储也使用 JDK 序列化,但您可以修改值序列化程序,如前所述。​​RedisMessageStore​​​​QueueChannel​​​​RedisChannelMessageStore​​​​LIST​​​​LPUSH​​​​RPOP​

我们建议使用此存储支持通道,而不是使用常规 . 以下示例定义了一个 Redis 消息存储库,并在具有队列的通道中使用它:​​RedisMessageStore​

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>

用于存储数据的键的格式为:(在前面的示例中,)。​​<storeBeanName>:<channelId>​​​​redisMessageStore:somePersistentQueueChannel​

此外,还提供了子类。 当您将其与 一起使用时,将按 (FIFO) 优先级顺序接收消息。 它使用标准标头并支持优先级值 ()。 具有其他优先级的消息(以及没有优先级的消息)将按FIFO顺序在任何具有优先级的消息之后检索。​​RedisChannelPriorityMessageStore​​​​QueueChannel​​​​IntegrationMessageHeaderAccessor.PRIORITY​​​​0 - 9​

这些存储仅实现而不实现 。 它们只能用于支持.​​BasicMessageGroupStore​​​​MessageGroupStore​​​​QueueChannel​

红地元数据存储

Spring Integration 3.0引入了一个新的基于Redis的MetadataStore(参见Metadata Store)实现。 可以使用 来维护跨应用程序重新启动的状态。 您可以将此新实现与适配器一起使用,例如:​​RedisMetadataStore​​​​MetadataStore​​​​MetadataStore​

  • 饲料
  • 文件
  • 邮票
  • 自来水龙

要指示这些适配器使用 new ,请声明一个名为 的 Spring Bean。 源入站通道适配器和源入站通道适配器都自动拾取并使用声明的 . 下面的示例演示如何声明这样的 Bean:​​RedisMetadataStore​​​​metadataStore​​​​RedisMetadataStore​

<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

由RedisProperties支持。 与它的交互使用 BoundHashOperations,这反过来又需要整个存储区。 在 的情况下,这扮演了一个区域的角色,这在分布式环境中很有用,当多个应用程序使用相同的 Redis 服务器时。 缺省情况下,此值为 。​​RedisMetadataStore​​​​key​​​​Properties​​​​MetadataStore​​​​key​​​​key​​​​MetaData​

从版本 4.0 开始,此存储实现了 ,使其在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。​​ConcurrentMetadataStore​

您不能将 (例如,在 中)与 Redis 集群一起使用,因为当前不支持原子性命令。​​RedisMetadataStore.replace()​​​​AbstractPersistentAcceptOnceFileListFilter​​​​WATCH​

Redis 存储入站通道适配器

Redis 存储入站通道适配器是一个轮询使用者,它从 Redis 集合读取数据并将其作为有效负载发送。 以下示例演示如何配置 Redis 存储入站通道适配器:​​Message​

<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>

前面的示例演示如何使用元素配置 Redis 存储入站通道适配器,为各种属性提供值,例如:​​store-inbound-channel-adapter​