java 服务总线与春天

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 服务总线与春天相关的知识,希望对你有一定的参考价值。

<!-- Snippets only -->

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.4.RELEASE</version>
    <relativePath/>
</parent>

<properties>
    <servicebus.version>2.1.6</servicebus.version>
</properties>

<!--
Use later version of azure-servicebus than that provided by
azure-servicebus-spring-boot-starter (optional)
-->
<dependency>
  <groupId>com.microsoft.azure</groupId>
  <artifactId>azure-servicebus</artifactId>
  <version>1.2.11</version>
</dependency>

<!--
Azure Boot Starter for Service Bus
https://mvnrepository.com/artifact/com.microsoft.azure/azure-servicebus-spring-boot-starter
-->
<dependency>
  <groupId>com.microsoft.azure</groupId>
  <artifactId>azure-servicebus-spring-boot-starter</artifactId>
  <version>${servicebus.version}</version>
</dependency>
# Topic
azure.servicebus.topic-name=x-events

# Queue
azure.servicebus.queue-name=x-mailer
azure.servicebus.queue-receive-mode=peeklock

# Subscribe to Topic
azure.servicebus.subscription-name=appian
azure.servicebus.subscription-receive-mode=peeklock

# Service Bus Connection String (without Key Vault)
azure.servicebus.connection-string=Endpoint=sb://sb-doubledecker-ric-eun.servicebus.windows.net/;SharedAccessKeyName=x-events-send-listen;SharedAccessKey=KmsawwhktZgY8gfb7g1rH6iXNJv9Kr3r1pLWol3o2Ik=;EntityPath=x-events
# & with Key Vault (making sure the key vault key name is unique)
azure.servicebus.connection-string=${events-servicebus-connectionstring}
@Component
@Slf4j
public class EventTopicPublisher {

    private TopicClient topicClient;

    private TelemetryClient telemetry;

    public EventTopicPublisher(TopicClient topicClient, TelemetryClient telemetry) {
        this.topicClient = topicClient;
        this.telemetry = telemetry;
    }

    boolean publishTopicMessage(String code, String messageBody) {
        log.debug("Sending event: {} of type '{}'", messageBody, code);
        final Message message = new Message(messageBody, MediaType.APPLICATION_JSON_VALUE);
        Map<String, String> props = new HashMap<>();
        props.put("X-Event-Code", code);
        message.setProperties(props);

        boolean success = false;
        long startTime = System.currentTimeMillis();

        try {
            topicClient.send(message);
            success = true;
        } catch (InterruptedException | ServiceBusException e) {
            telemetry.trackException(e);
        } finally {
            Duration delta = new Duration(System.currentTimeMillis() - startTime);
            RemoteDependencyTelemetry dependency = new RemoteDependencyTelemetry("x-events", "send", delta, success);
            dependency.setType("Azure Service Bus");
            telemetry.trackDependency(dependency);
        }
        return success;
    }

    @PreDestroy
    public void shutdown() throws ServiceBusException {
        log.debug("Shutting down, closing topic client {}", topicClient.getTopicName());
        topicClient.close();
    }

}
@Component
@Slf4j
public class AppianTopicSubscriber implements IMessageHandler {

    private TopicClient topicClient;

    private SubscriptionClient subscriptionClient;

    private TelemetryClient telemetry;

    public AppianTopicSubscriber(TopicClient topicClient, SubscriptionClient subscriptionClient, TelemetryClient telemetry) {
        this.topicClient = topicClient;
        this.subscriptionClient = subscriptionClient;
        this.telemetry = telemetry;
    }

    @PostConstruct
    public void startup() throws ServiceBusException, InterruptedException {
        // start receiving messages
        ExecutorService executorService = Executors.newCachedThreadPool();
        subscriptionClient.registerMessageHandler(this, new MessageHandlerOptions(1, false, ofMinutes(1)), executorService);
    }

    /**
     * The callback that the message pump uses to pass received Messages to the app.
     *
     * @param message received message
     */
    @Override
    public CompletableFuture<Void> onMessageAsync(IMessage message) {
        final String messageBody = new String(message.getBody(), UTF_8);
        boolean success = false;
        long startTime = System.currentTimeMillis();
        try {
            log.debug("Received message from topic: '{}'", messageBody);
            success = processMessageInSomeWay(messageBody);
            if (success) {
                subscriptionClient.complete(message.getLockToken());
            }
            else {
                subscriptionClient.abandon(message.getLockToken());
            }
        } catch (InterruptedException | ServiceBusException e) {
            telemetry.trackException(e);
        } finally {
            trackServiceBusTelemetry(success, startTime);
        }

        return completedFuture(null);
    }

    private void trackServiceBusTelemetry(boolean success, long startTime) {
        Duration delta = new Duration(System.currentTimeMillis() - startTime);
        RemoteDependencyTelemetry dependency = new RemoteDependencyTelemetry("x-events", "receive", delta, success);
        dependency.setType("Azure Service Bus");
        telemetry.trackDependency(dependency);
    }

    @Override
    public void notifyException(Throwable exception, ExceptionPhase phase) {
        ExceptionTelemetry et = new ExceptionTelemetry(exception);
        et.setSeverityLevel(SeverityLevel.Critical);
        telemetry.trackException(et);
    }

    @PreDestroy
    public void shutdown() throws ServiceBusException {
        topicClient.close();
    }
}

以上是关于java 服务总线与春天的主要内容,如果未能解决你的问题,请参考以下文章

使用 Java SDK 的 Azure 服务总线访问,连接模式

知道为什么叫spring吗,因为它java开发迎来了春天

OSChina 本周软件推荐 —— zbus 轻量级服务总线/消息队列

Azure 服务总线 http 与 websocket

SOA 项目与服务总线项目

在春天运行 javax.websocket 端点?