Spring Cloud Azure 参考文档
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Cloud Azure 参考文档相关的知识,希望对你有一定的参考价值。
14.1.5. 示例
有关更多详细信息,请参阅azure-spring-boot-samples。
14.2. Azure 服务总线的春季云流绑定器
14.2.1. 关键概念
适用于 Azure 服务总线的 Spring Cloud Stream Binder 提供 Spring Cloud Stream Framework 的绑定实现。 此实现在其基础上使用 Spring 集成服务总线通道适配器。
定时消息
此绑定器支持将消息提交到主题以进行延迟处理。用户可以发送带有标头的计划消息以毫秒为单位表示消息的延迟时间。消息将在毫秒后传递到相应的主题。x-delayx-delay
消费群体
服务总线主题提供与 Apache Kafka 类似的使用者组支持,但逻辑略有不同。 此活页夹依赖于主题来充当使用者组。Subscription
14.2.2. 依赖设置
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>
或者,也可以使用 Spring Cloud Azure Stream Service Bus Startter,如以下 Maven 示例所示:
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>
14.2.3. 配置
绑定程序提供以下 2 个配置选项部分:
连接配置属性
本部分包含用于连接到 Azure 服务总线的配置选项。
如果选择使用安全主体对 Azure Active Directory 进行身份验证和授权以访问 Azure 资源,请参阅使用 Azure AD 授权访问,以确保已向安全主体授予访问 Azure 资源的足够权限。 |
Table 31. Connection configurable properties of spring-cloud-azure-stream-binder-servicebus
财产 | 类型 | 描述 |
spring.cloud.azure.servicebus.enabled | 布尔 | 是否启用了 Azure 服务总线。 |
spring.cloud.azure.servicebus.connection-string | 字符串 | 服务总线命名空间连接字符串值。 |
spring.cloud.azure.servicebus.namespace | 字符串 | 服务总线命名空间值,它是 FQDN 的前缀。FQDN 应由 <命名空间名称>.<域名>组成 |
spring.cloud.azure.servicebus.domain-name | 字符串 | Azure 服务总线命名空间值的域名。 |
常见的 Azure 服务 SDK 配置选项也可以为 Spring Cloud Azure Stream Service Bus 绑定器进行配置。支持的配置选项在“配置”页中介绍,可以使用统一前缀前缀进行配置。 |
默认情况下,绑定器还支持Spring Can Azure资源管理器。若要了解如何使用未授予相关角色的安全主体检索连接字符串,请参阅资源管理器示例以了解详细信息。Data
Azure 服务总线绑定配置属性
以下选项分为四个部分:消费者属性、高级消费者 配置、创建者属性和高级创建者配置。
消费者属性
这些属性通过以下方式公开。ServiceBusConsumerProperties
Table 32. Consumer configurable properties of spring-cloud-azure-stream-binder-servicebus
财产 | 类型 | 违约 | 描述 |
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-reject | 布尔 | 假 | 如果失败的消息被路由到 DLQ。 |
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.max-concurrent-calls | 整数 | 1 | 服务总线处理器客户端应处理的最大并发消息数。 |
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.max-concurrent-sessions | 整数 | 零 | 在任何给定时间要处理的最大并发会话数。 |
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.session-enabled | 布尔 | 零 | 是否启用会话。 |
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.prefetch-count | 整数 | 0 | 服务总线处理器客户端的预取计数。 |
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.sub-queue | 子队列 | 没有 | 要连接到的子队列的类型。 |
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.max-auto-lock-renew-duration | 期间 | 5米 | 继续自动续订锁的时间量。 |
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.receive-mode | ServiceBusReceiveMode | peek_lock | 服务总线处理器客户端的接收模式。 |
spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.auto-complete | 布尔 | 真 | 是否自动结算邮件。如果设置为 false,则添加邮件头 使开发人员能够手动结算消息。 |
高级消费者配置
支持为每个绑定程序使用者自定义上述连接和常见的 Azure SDK 客户端配置,可以使用前缀进行配置。spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.
生产者属性
这些属性通过以下方式公开。ServiceBusProducerProperties
Table 33. Producer configurable properties of spring-cloud-azure-stream-binder-servicebus
财产 | 类型 | 违约 | 描述 |
spring.cloud.stream.servicebus.bindings.<binding-name>.producer.sync | 布尔 | 假 | 开关标志 用于制作人的同步。 |
spring.cloud.stream.servicebus.bindings.<binding-name>.producer.send-timeout | 长 | 10000 | 超时 发送生产者的价值。 |
spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type | 服务总线实体类型 | 零 | 生成者的服务总线实体类型,绑定生成者需要。 |
使用绑定生产者时,需要配置属性 of。 |
高级生产者配置
支持为每个绑定器生成器自定义上述连接和常见的 Azure SDK 客户端配置,可以使用前缀进行配置。spring.cloud.stream.servicebus.bindings.<binding-name>.producer.
14.2.4. 基本用法
从/向服务总线发送和接收消息
第 1 步。使用凭据信息填充配置选项。
- 对于作为连接字符串的凭据,请在 application.yml 中配置以下属性:
spring:
cloud:
azure:
servicebus:
connection-string: $SERVICEBUS_NAMESPACE_CONNECTION_STRING
stream:
function:
definition: consume;supply
bindings:
consume-in-0:
destination: $SERVICEBUS_ENTITY_NAME
# If you use Service Bus Topic, please add the following configuration
# group: $SUBSCRIPTION_NAME
supply-out-0:
destination: $SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE
servicebus:
bindings:
consume-in-0:
consumer:
auto-complete: false
supply-out-0:
producer:
entity-type: queue # set as "topic" if you use Service Bus Topic
- 对于作为服务主体的凭据,请在 application.yml 中配置以下属性:
spring:
cloud:
azure:
credential:
client-id: $AZURE_CLIENT_ID
client-secret: $AZURE_CLIENT_SECRET
profile:
tenant-id: $AZURE_TENANT_ID
servicebus:
namespace: $SERVICEBUS_NAMESPACE
stream:
function:
definition: consume;supply
bindings:
consume-in-0:
destination: $SERVICEBUS_ENTITY_NAME
# If you use Service Bus Topic, please add the following configuration
# group: $SUBSCRIPTION_NAME
supply-out-0:
destination: $SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE
servicebus:
bindings:
consume-in-0:
consumer:
auto-complete: false
supply-out-0:
producer:
entity-type: queue # set as "topic" if you use Service Bus Topic
- 对于作为托管标识的凭据,请在 application.yml 中配置以下属性:
spring:
cloud:
azure:
credential:
managed-identity-enabled: true
client-id: $MANAGED_IDENTITY_CLIENT_ID # Only needed when using a user-assigned managed identity
servicebus:
namespace: $SERVICEBUS_NAMESPACE
stream:
function:
definition: consume;supply
bindings:
consume-in-0:
destination: $SERVICEBUS_ENTITY_NAME
# If you use Service Bus Topic, please add the following configuration
# group: $SUBSCRIPTION_NAME
supply-out-0:
destination: $SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE
servicebus:
bindings:
consume-in-0:
consumer:
auto-complete: false
supply-out-0:
producer:
entity-type: queue # set as "topic" if you use Service Bus Topic
第 2 步。定义供应商和消费者。
@Bean
public Consumer<Message<String>> consume()
return message ->
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
LOGGER.info("New message received: ", message.getPayload());
checkpointer.success()
.doOnSuccess(success -> LOGGER.info("Message successfully checkpointed", message.getPayload()))
.doOnError(error -> LOGGER.error("Exception found", error))
.block();
;
@Bean
public Supplier<Message<String>> supply()
return () ->
LOGGER.info("Sending message, sequence " + i);
return MessageBuilder.withPayload("Hello world, " + i++).build();
;
分区键支持
绑定程序允许在消息标头中设置分区键和会话 ID,从而支持服务总线分区。本节介绍如何设置消息的分区键。
Spring Cloud Stream 提供分区键 SpEL 表达式属性。例如,将此属性设置为 并添加一个名为 <message-header-key> 的标头。Spring Cloud Stream 在计算上述表达式以分配分区键时将使用此标头的值。下面是一个示例生产者代码:spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression
"partitionKey- + headers[<message-header-key>]"
@Bean
public Supplier<Message<String>> generate()
return () ->
String value = “random payload”;
return MessageBuilder.withPayload(value)
.setHeader("<message-header-key>", value.length() % 4)
.build();
;
会话支持
绑定程序支持服务总线的消息会话。 可以通过消息标头设置消息的会话 ID。
@Bean
public Supplier<Message<String>> generate()
return () ->
String value = “random payload”;
return MessageBuilder.withPayload(value)
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session id")
.build();
;
根据服务总线分区,会话 ID 的优先级高于分区键。因此,当设置了两个 ofand(或) 标头时, 会话 ID 的值最终将用于覆盖分区键的值。 |
错误通道
- 使用者错误通道
默认情况下,此通道处于打开状态,并且默认的使用者错误通道处理程序用于在启用时将失败的消息发送到死信队列,否则将放弃失败的消息。spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected
若要自定义使用者错误通道处理程序,可以通过以下方式将自己的错误处理程序注册到相关的使用者错误通道:
// Replace destination with spring.cloud.stream.bindings.input.destination
// Replace group with spring.cloud.stream.bindings.input.group
@ServiceActivator(inputChannel = "destination.group.errors")
public void consumerError(Message<?> message)
LOGGER.error("Handling customer ERROR: " + message);
- 生产者错误通道
默认情况下,此通道不打开。您需要在 application.properties 中添加配置以启用它,如下所示:
spring.cloud.stream.default.producer.errorChannelEnabled=true
您可以通过以下方式处理错误消息:
// Replace destination with spring.cloud.stream.bindings.output.destination
@ServiceActivator(inputChannel = "destination.errors")
public void producerError(Message<?> message)
LOGGER.error("Handling Producer ERROR: " + message);
- 全局默认错误通道
默认情况下,Spring 集成会创建一个名为“errorChannel”的全局错误通道,该通道允许用户为其订阅许多端点。
@ServiceActivator(inputChannel = "errorChannel")
public void producerError(Message<?> message)
LOGGER.error("Handling ERROR: " + message);
服务总线消息标头
有关支持的基本消息标头,请参阅服务总线消息标头。
设置 partiton 键时,消息头的优先级高于 Spring Cloud 流属性。因此,只有在未配置 ,,的任何标头时才会生效。 |
多种粘合剂支持
使用多个绑定程序还支持连接到多个服务总线命名空间。此示例以连接字符串为例。还支持服务主体和托管标识的凭据,用户可以在每个绑定程序的环境设置中设置相关属性。
第 1 步。若要使用服务总线的多个绑定程序,我们需要在 application.yml 中配置以下属性
spring:
cloud:
stream:
function:
definition: consume1;supply1;consume2;supply2
bindings:
consume1-in-0:
destination: $SERVICEBUS_TOPIC_NAME
group: $SUBSCRIPTION_NAME
supply1-out-0:
destination: $SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE
consume2-in-0:
binder: servicebus-2
destination: $SERVICEBUS_QUEUE_NAME
supply2-out-0:
binder: servicebus-2
destination: $SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE
binders:
servicebus-1:
type: servicebus
default-candidate: true
environment:
spring:
cloud:
azure:
servicebus:
connection-string: $SERVICEBUS_NAMESPACE_01_CONNECTION_STRING
servicebus-2:
type: servicebus
default-candidate: false
environment:
spring:
cloud:
azure:
servicebus:
connection-string: $SERVICEBUS_NAMESPACE_02_CONNECTION_STRING
servicebus:
bindings:
consume1-in-0:
consumer:
auto-complete: false
supply1-out-0:
producer:
entity-type: topic
consume2-in-0:
consumer:
auto-complete: false
supply2-out-0:
producer:
entity-type: queue
poller:
initial-delay: 0
fixed-delay: 1000
第 2 步。我们需要定义两个供应商和两个消费者
@Bean
public Supplier<Message<String>> supply1()
return () ->
LOGGER.info("Sending message1, sequence1 " + i);
return MessageBuilder.withPayload("Hello world1, " + i++).build();
;
@Bean
public Supplier<Message<String>> supply2()
return () ->
LOGGER.info("Sending message2, sequence2 " + j);
return MessageBuilder.withPayload("Hello world2, " + j++).build();
;
@Bean
public Consumer<Message<String>> consume1()
return message ->
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
LOGGER.info("New message1 received: ", message);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message successfully checkpointed", message.getPayload()))
.doOnError(e -> LOGGER.error("Error found", e))
.block();
;
@Bean
public Consumer<Message<String>> consume2()
return message ->
Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
LOGGER.info("New message2 received: ", message);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message successfully checkpointed", message.getPayload()))
.doOnError(e -> LOGGER.error("Error found", e))
.block();
;
资源提供
服务总线绑定程序支持预配队列、主题和订阅,用户可以使用以下属性来启用预配。
spring:
cloud:
azure:
credential:
tenant-id: $AZURE_TENANT_ID
profile:
subscription-id: $AZURE_SUBSCRIPTION_ID
servicebus:
resource:
resource-group: $AZURE_SERVICEBUS_RESOURECE_GROUP
stream:
servicebus:
bindings:
<binding-name>:
consumer:
entity-type: $SERVICEBUS_CONSUMER_ENTITY_TYPE
14.2.5. 示例
有关更多详细信息,请参阅azure-spring-boot-samples。
15. 春季 JMS 支持
通过集成到 Spring JMS 框架中的 JMS API 使用 Azure 服务总线。 必须提供 Azure 服务总线连接字符串,该字符串将解析为 AMQP 代理的登录用户名、密码和远程 URI。
15.1. 依赖设置
如果要迁移 Spring JMS 应用程序以使用 Azure 服务总线,请添加以下依赖项。
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter-servicebus-jms</artifactId>
</dependency>
15.2. 配置
Table 34. Configurable properties when using Spring JMS support
财产 | 描述 |
spring.jms.servicebus.connection-string | Azure 服务总线连接字符串。当想要直接提供连接字符串时,应提供。 |
spring.jms.servicebus.topic-client-id | JMS 客户机标识。仅适用于 topicJmsListenerContainerFactory 的 bean。 |
spring.jms.servicebus.idle-timeout | 空闲的持续时间。 |
spring.jms.servicebus.pricing-tier | Azure 服务总线价格层。 |
spring.jms.servicebus.listener.reply-pub-sub-domain | 回复目标类型是否为主题。 |
spring.jms.servicebus.listener.phase | 指定应在其中启动和停止此容器的阶段。 |
spring.jms.servicebus.listener.reply-qos-settings | 配置发送答复时要使用的 Qos设置。 |
spring.jms.servicebus.listener.subscription-durable | 是否使订阅持久。仅适用于 topicJmsListenerContainerFactory 的 bean。 |
spring.jms.servicebus.listener.subscription-shared | 是否共享订阅。仅适用于 topicJmsListenerContainerFactory 的 bean。 |
spring.jms.servicebus.password | AMQP经纪商的登录密码 |
spring.jms.servicebus.pool.block-if-full | 在请求连接且池已满时是否阻止。 |
spring.jms.servicebus.pool.block-if-full-timeout | 如果池仍已满,则引发异常之前的阻塞期。 |
spring.jms.servicebus.pool.enabled | 是否应该创建 JmsPoolConnectionFactory,而不是常规的连接工厂。 |
spring.jms.servicebus.pool.idle-timeout | 连接空闲超时。 |
spring.jms.servicebus.pool.max-connections | 池连接的最大数量。 |
spring.jms.servicebus.pool.max-sessions-per-connection | 池中每个连接的最大池会话数。 |
spring.jms.servicebus.pool.time-between-expiration-check | 空闲连接逐出线程运行之间的睡眠时间。 |
spring.jms.servicebus.pool.use-anonymous-producer | 是否只使用一个匿名的“消息生产者”实例。 |
spring.jms.servicebus.prefetch-policy.all | 此服务总线命名空间中预提取选项的回退值。 |
spring.jms.servicebus.prefetch-policy.durable-topic-prefetch | 持久主题的预取数。 |
spring.jms.servicebus.prefetch-policy.queue-browser-prefetch | 队列浏览器的预取数。 |
Spring Cloud Azure 4.0 GA – 实现 Spring 框架与 Azure 服务的无缝集成 |