Spring WebSockets ActiveMQ convertAndSendToUser

Posted

技术标签:

【中文标题】Spring WebSockets ActiveMQ convertAndSendToUser【英文标题】: 【发布时间】:2018-10-02 12:41:48 【问题描述】:

我有一个 Spring Boot 应用程序 (Jhipster),它使用 STOMP over WebSockets 将信息从服务器传递给用户。

我最近添加了一个 ActiveMQ 服务器来处理应用程序的水平扩展,并带有一个 Amazon 自动扩展组/负载均衡器。

我使用convertAndSendToUser() 方法,该方法适用于应用程序的单个实例来定位经过身份验证的用户的“个人队列”,因此只有他们才能收到消息。

但是,当我在负载均衡器后面启动应用程序时,我发现消息只有如果在服务器上生成了他们的 websocket-proxy 连接(到经纪人)成立于?

我如何确保消息通过 ActiveMQ 到达用户实际“连接”的应用程序的任何实例,而不管哪个实例接收到,例如执行 convertAndSendToUser() 事件的 HTTP 请求?

这里是我的 StompBrokerRelayMessageHandler 供参考:

@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() 
    StompBrokerRelayMessageHandler handler = (StompBrokerRelayMessageHandler) super.stompBrokerRelayMessageHandler();
    handler.setTcpClient(new Reactor2TcpClient<>(
        new StompTcpFactory(orgProperties.getAws().getAmazonMq().getStompRelayHost(),
            orgProperties.getAws().getAmazonMq().getStompRelayPort(), orgProperties.getAws().getAmazonMq
            ().getSsl())
    ));

    return handler;


@Override
public void configureMessageBroker(MessageBrokerRegistry config) 
    config.enableStompBrokerRelay("/queue", "/topic")
        .setSystemLogin(orgProperties.getAws().getAmazonMq().getStompRelayHostUser())
        .setSystemPasscode(orgProperties.getAws().getAmazonMq().getStompRelayHostPass())
        .setClientLogin(orgProperties.getAws().getAmazonMq().getStompRelayHostUser())
        .setClientPasscode(orgProperties.getAws().getAmazonMq().getStompRelayHostPass());

    config.setApplicationDestinationPrefixes("/app");

我通过检查 SessionSubscribeEvent 中的标头找到了与 ActiveMQ 上生成的队列对应的名称,该名称是在用户订阅用户队列时在侦听器中生成的,如 simpSessionId

@Override
@EventListener(SessionSubscribeEvent.class)
public void onSessionSubscribeEvent(SessionSubscribeEvent event) 
    log.debug("Session Subscribe Event:" +
        "", event.getMessage().getHeaders().toString());

对应的队列可以在ActiveMQ中找到,格式为:simpDestination-usersimpSessionId

我可以将 sessionId 保存在键值对中,然后将消息推送到该主题频道吗?


我还发现了一些possibilities 在 CONNECT/SUBSCRIBE 框架中设置 ActiveMQ 特定 STOMP 属性以创建 持久订阅者 如果我设置这些属性,Spring 会比理解路由吗?

client-id & subcriptionName

【问题讨论】:

我在使用 springboot 连接到 AWS ActiveMQ 实例时遇到了相关问题(嵌入式代理工作正常)。您用于配置的 orgProperties.getAws().getAmazonMq().* 部分的库是什么?它是适用于 java 的 AWS 开发工具包之一吗? @RyanZakariudakis 对orgProperties 的引用只是引用了连接端点,来自 spring 外部化配置的凭据字符串。您可能会遇到的一个问题是使用他们提供的stomp+ssl:// 连接字符串,我将整个前缀留在了连接字符串中。 【参考方案1】:

修改MessageBrokerReigstry config 解决了这个问题:

config.enableStompBrokerRelay("/queue", "/topic")
            .setUserDestinationBroadcast("/topic/registry.broadcast")

基于documentation section 4.4.13中的这一段:

在多应用服务器场景中,用户目的地可能会保留 未解决,因为用户连接到不同的服务器。在 在这种情况下,您可以将目标配置为广播未解析 消息,以便其他服务器有机会尝试。这可以是 通过 userDestinationBroadcast 的属性完成 Java 配置中的 MessageBrokerRegistry 和 message-broker 元素的 user-destination-broadcast 属性 XML

我没有看到任何关于“为什么”/topic/registry.broadcast 是正确“主题”目的地的文档,但我发现了它的各种迭代:

    websocket sessions sample doesn't cluster.. spring-session-1.2.2 What is MultiServerUserRegistry in spring websocket? Spring websocket - sendToUser from a cluster does not work from backup server

【讨论】:

谢谢你,我认为 RabbitMQ 默认是这样做的。

以上是关于Spring WebSockets ActiveMQ convertAndSendToUser的主要内容,如果未能解决你的问题,请参考以下文章

使用 spring 启用 WebSockets

对如何使用 Spring-websockets 进行 stomp 调用感到困惑

Spring 4 中 Websockets 的自定义对象映射器

Spring消息传递+安全如何通过websockets登录?

Spring-Websockets 4.2 中使用 SockJS 的部分消息

Spring 4 STOMP Websockets Heartbeat