java 服务总线与春天
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 服务总线与春天相关的知识,希望对你有一定的参考价值。
<!-- Snippets only -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/>
</parent>
<properties>
<servicebus.version>2.1.6</servicebus.version>
</properties>
<!--
Use later version of azure-servicebus than that provided by
azure-servicebus-spring-boot-starter (optional)
-->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus</artifactId>
<version>1.2.11</version>
</dependency>
<!--
Azure Boot Starter for Service Bus
https://mvnrepository.com/artifact/com.microsoft.azure/azure-servicebus-spring-boot-starter
-->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-servicebus-spring-boot-starter</artifactId>
<version>${servicebus.version}</version>
</dependency>
# Topic
azure.servicebus.topic-name=x-events
# Queue
azure.servicebus.queue-name=x-mailer
azure.servicebus.queue-receive-mode=peeklock
# Subscribe to Topic
azure.servicebus.subscription-name=appian
azure.servicebus.subscription-receive-mode=peeklock
# Service Bus Connection String (without Key Vault)
azure.servicebus.connection-string=Endpoint=sb://sb-doubledecker-ric-eun.servicebus.windows.net/;SharedAccessKeyName=x-events-send-listen;SharedAccessKey=KmsawwhktZgY8gfb7g1rH6iXNJv9Kr3r1pLWol3o2Ik=;EntityPath=x-events
# & with Key Vault (making sure the key vault key name is unique)
azure.servicebus.connection-string=${events-servicebus-connectionstring}
@Component
@Slf4j
public class EventTopicPublisher {
private TopicClient topicClient;
private TelemetryClient telemetry;
public EventTopicPublisher(TopicClient topicClient, TelemetryClient telemetry) {
this.topicClient = topicClient;
this.telemetry = telemetry;
}
boolean publishTopicMessage(String code, String messageBody) {
log.debug("Sending event: {} of type '{}'", messageBody, code);
final Message message = new Message(messageBody, MediaType.APPLICATION_JSON_VALUE);
Map<String, String> props = new HashMap<>();
props.put("X-Event-Code", code);
message.setProperties(props);
boolean success = false;
long startTime = System.currentTimeMillis();
try {
topicClient.send(message);
success = true;
} catch (InterruptedException | ServiceBusException e) {
telemetry.trackException(e);
} finally {
Duration delta = new Duration(System.currentTimeMillis() - startTime);
RemoteDependencyTelemetry dependency = new RemoteDependencyTelemetry("x-events", "send", delta, success);
dependency.setType("Azure Service Bus");
telemetry.trackDependency(dependency);
}
return success;
}
@PreDestroy
public void shutdown() throws ServiceBusException {
log.debug("Shutting down, closing topic client {}", topicClient.getTopicName());
topicClient.close();
}
}
@Component
@Slf4j
public class AppianTopicSubscriber implements IMessageHandler {
private TopicClient topicClient;
private SubscriptionClient subscriptionClient;
private TelemetryClient telemetry;
public AppianTopicSubscriber(TopicClient topicClient, SubscriptionClient subscriptionClient, TelemetryClient telemetry) {
this.topicClient = topicClient;
this.subscriptionClient = subscriptionClient;
this.telemetry = telemetry;
}
@PostConstruct
public void startup() throws ServiceBusException, InterruptedException {
// start receiving messages
ExecutorService executorService = Executors.newCachedThreadPool();
subscriptionClient.registerMessageHandler(this, new MessageHandlerOptions(1, false, ofMinutes(1)), executorService);
}
/**
* The callback that the message pump uses to pass received Messages to the app.
*
* @param message received message
*/
@Override
public CompletableFuture<Void> onMessageAsync(IMessage message) {
final String messageBody = new String(message.getBody(), UTF_8);
boolean success = false;
long startTime = System.currentTimeMillis();
try {
log.debug("Received message from topic: '{}'", messageBody);
success = processMessageInSomeWay(messageBody);
if (success) {
subscriptionClient.complete(message.getLockToken());
}
else {
subscriptionClient.abandon(message.getLockToken());
}
} catch (InterruptedException | ServiceBusException e) {
telemetry.trackException(e);
} finally {
trackServiceBusTelemetry(success, startTime);
}
return completedFuture(null);
}
private void trackServiceBusTelemetry(boolean success, long startTime) {
Duration delta = new Duration(System.currentTimeMillis() - startTime);
RemoteDependencyTelemetry dependency = new RemoteDependencyTelemetry("x-events", "receive", delta, success);
dependency.setType("Azure Service Bus");
telemetry.trackDependency(dependency);
}
@Override
public void notifyException(Throwable exception, ExceptionPhase phase) {
ExceptionTelemetry et = new ExceptionTelemetry(exception);
et.setSeverityLevel(SeverityLevel.Critical);
telemetry.trackException(et);
}
@PreDestroy
public void shutdown() throws ServiceBusException {
topicClient.close();
}
}
以上是关于java 服务总线与春天的主要内容,如果未能解决你的问题,请参考以下文章
使用 Java SDK 的 Azure 服务总线访问,连接模式