从应用程序的其他层发送 STOMP 消息

Posted

技术标签:

【中文标题】从应用程序的其他层发送 STOMP 消息【英文标题】:Sending STOMP messages from other layers of an application 【发布时间】:2017-11-12 14:40:10 【问题描述】:

我正在使用带有 RabbitMQ 代理的集群 tomcat 环境中使用 Spring Websockets 构建应用程序。我有一个 API 模块,它需要注册要收听的端点。我按照正常的例子,想出了这个配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer

    @Override
    public void configureMessageBroker(final MessageBrokerRegistry config)
    
        config.enableStompBrokerRelay("/topic/")
            .setRelayHost("localhost")
            .setRelayPort(61613)
            .setClientLogin("guest")
            .setClientPasscode("guest");
    

    @Override
    public void registerStompEndpoints(final StompEndpointRegistry registry)
    
        registry.addEndpoint("/updates")
            .setAllowedOrigins("*")
            .withSockJS();
    

虽然这可行,但它并没有解决我的问题,因为 WebSocket 和中继配置似乎都捆绑到 API 模块中,因此其他层无法重用代理。我需要在服务层进行 stomp 消息代理中继配置,以便我们应用程序的其他模块可以将消息推送到 RabbitMQ 中的主题,然后转身通知 API 模块更新所有打开的 websocket。

下面是我们应用程序中相关层的示例图,以及我想要完成的任务。我需要允许模块“Cron Message Sender”通过我们的其他 API 模块将消息推送给订阅消息主题的每个人。

【问题讨论】:

我现在正在探索的另一个可能的想法是将两者分解为完全独立的组件。我的意思是每个 API 模块都将使用一个简单的代理(无中继)运行它自己的独立 websocket 配置。然后,在服务层,我将通过 AMQP 直接与 rabbitmq 对话,将每个服务实例连接在一起。然后,每个 API 模块将侦听传入的 AMQP 消息并通过 Stomp WS 将它们转发给各自的用户。我对消息传递仍然很陌生,不确定这在生产环境中的性能方面是好是坏。 假设这种方法有效,如果我决定放弃 RabbitMQ 转而使用其他消息代理(如 AWS SQS),这将允许我解耦组件,从而提供更大的灵活性。 【参考方案1】:

所以第二种方法确实有效。我将 websocket 配置为独立运行(无中继),然后在服务层建立了一个单独的 AMQP 消息代理连接以允许服务之间的通信。在 API 模块中,我只是监听 AMQP 消息代理,然后手动将这些消息转发到通知 websocket 订阅者的 SimpMessagingTemplate。我不确定这在技术上是否是“正确”的方法,但它似乎工作得很好,我还没有看到实施有任何问题。事实上,我实际上认为我可能更喜欢这种方法,因为我现在只是让我的所有服务能够使用比我最初需要的 websockets 更多类型的消息相互通信。

这是新的配置:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer

    @Override
    public void configureMessageBroker(final MessageBrokerRegistry config)
    
        config.enableSimpleBroker("/topic");
    

    @Override
    public void registerStompEndpoints(final StompEndpointRegistry registry)
    
        registry.addEndpoint("/updates")
            .setAllowedOrigins("*")
            .withSockJS();
    

这里是我监听消息代理并将消息转发给 websocket 订阅者的地方:

@Component
public class SendWebSocketUpdates

    private static final Logger logger = LoggerFactory.getLogger(SendWebSocketUpdates.class);

    private final Gson gson;

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Autowired
    private MessageBrokerConsumer<String> messageBrokerConsumer;

    public SendWebSocketUpdates()
    
        this.gson = new Gson();
    

    @PostConstruct
    public void init()
    
        //listen for incoming AMQP messages from the rabbitmq server and forward them to the websocket subscribers
        messageBrokerConsumer.addListener((message, topicName) -> 
            final String destination = "/topic/" + topicName;
            final String messageJson = gson.toJson(message.getBody());

            //check to see if trace logging is enabled
            if (logger.isTraceEnabled())
            
                logger.trace("Sending Message to \"\": ", destination, messageJson);
            

            //broadcast the via a STOMP message to subscribers of this topic
            messagingTemplate.convertAndSend(destination, messageJson);
        );
    

【讨论】:

您实际上可以使用 AMQP 协议从连接到相同(作为中继)RabbitMQ 实例的任何应用程序层向您的 websocket 会话发送消息(并且仍然在边缘层使用 StompBrokerRelay)。只需在/topic 之后使用等于目标子部分的路由密钥发送到amq.topic 交换。例如,如果您的 websocket 客户端订阅了/topic/chat,则使用chat 路由键将 AMQP 消息发送到amq.topic 交换,它将由 RabbitMQ 传递给所有订阅此 STOMP 主题的StompBrokerRelays,并且然后到 websockets。 谢谢!实际上,我们最终完全放弃了 RabbitMQ 和 STOMP,转而使用 MQTT 和 VerneMQ。 我能问你为什么选择放弃 RabbitMQ 和 STOMP 而选择使用 MQTT 和 VerneMQ 吗?吞吐量?使用方便吗?以及如何在 Browser 和 Broker 之间传递 MQTT 消息?你还在使用 Spring Webscoket 支持(实现自定义 SubProtocolHandler)还是从浏览器直接连接到 VerneMQ? 我们的服务主要针对移动设备。当我们第一次探索消息主题时,我们从这种方法开始,希望我们可以使用集群的 Tomcat 环境来分担所有活动连接的负载。在对我们的产品进行了一些研究和开发之后,我们发现 MQTT 比 STOMP 更适合移动设备。此外,由于我们现在必须直接连接到消息代理本身,因此我们需要一个高度可用且易于扩展的 MQTT 代理。 VerneMQ 最终满足了我们的需求,因为它的性能和水平扩展/集群能力。 我们不再使用 Spring Websockets。所有消息代理功能都是通过直接连接到 VerneMQ。【参考方案2】:

解决这个问题很容易。我浪费了一整天的时间来寻找解决方案。 Here 's my answer for the same problem.

关键是setUserDestinationBroadcastsetUserRegistryBroadcast

registry.enableStompBrokerRelay("/topic/", "/queue/", "/exchange/") 
        .setUserDestinationBroadcast("/topic/log-unresolved-user") 
        .setUserRegistryBroadcast("/topic/log-user-registry")

【讨论】:

以上是关于从应用程序的其他层发送 STOMP 消息的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 STOMP 从 Spring WebSocket 服务器向 WebSocket 客户端发送消息?

Spring stomp - 使用 SimpMessagingTemplate 从服务器发送消息

第18章-使用WebSocket和STOMP实现消息功能

如何使用 Spring WebSocket 向 STOMP 客户端发送错误消息?

无法使用 STOMP 向 activemq 发送消息

使用Spring STOMP websockets向单个人发送消息