弹簧+WebSocket+STOMP。给特定会话的消息(非用户)

Posted

技术标签:

【中文标题】弹簧+WebSocket+STOMP。给特定会话的消息(非用户)【英文标题】:Spring+WebSocket+STOMP. Message to specific session (NOT user) 【发布时间】:2017-02-19 13:28:19 【问题描述】:

我正在尝试在 Spring 框架上设置基本消息代理,使用我找到的配方 here

作者声称它运行良好,但我无法在客户端接收消息,尽管没有发现明显的错误。

目标:

我想要做的基本上是一样的——客户端连接到服务器并请求一些异步操作。操作完成后,客户端应该会收到一个事件。重要提示:客户端未通过 Spring 身份验证,但来自消息代理的异步后端部分的事件包含他的登录名,因此我认为存储 Login-SessionId 对的并发映射以将消息直接发送到特定会话就足够了.

客户端代码:

//app.js

var stompClient = null;
var subscription = '/user/queue/response';

//invoked after I hit "connect" button
function connect() 
//reading from input text form
var agentId = $("#agentId").val();

var socket = new SockJS('localhost:5555/cti');
stompClient = Stomp.over(socket);
stompClient.connect('Login':agentId, function (frame) 
    setConnected(true);
    console.log('Connected to subscription');
    stompClient.subscribe(subscription, function (response) 
        console.log(response);
    );
);



//invoked after I hit "send" button
function send() 

var cmd_str = $("#cmd").val();
var cmd = 
    'command':cmd_str
;
console.log("sending message...");
stompClient.send("/app/request", , JSON.stringify(cmd));
console.log("message sent");

这是我的配置。

//message broker configuration

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer


@Override
public void configureMessageBroker(MessageBrokerRegistry config) 
    /** queue prefix for SUBSCRIPTION (FROM server to CLIENT)  */
    config.enableSimpleBroker("/topic");
    /** queue prefix for SENDING messages (FROM client TO server) */
    config.setApplicationDestinationPrefixes("/app");



@Override
public void registerStompEndpoints(StompEndpointRegistry registry) 

    registry
            .addEndpoint("/cti")
            .setAllowedOrigins("*")
            .withSockJS();



现在,在基本配置之后,我应该实现一个应用程序事件处理程序,以在客户端连接上提供与会话相关的信息。

//application listener

@Service
public class STOMPConnectEventListener implements ApplicationListener<SessionConnectEvent> 

@Autowired
//this is basically a concurrent map for storing pairs "sessionId - login"
WebAgentSessionRegistry webAgentSessionRegistry;

@Override
public void onApplicationEvent(SessionConnectEvent event) 
    StompHeaderAccessor sha = StompHeaderAccessor.wrap(event.getMessage());

    String agentId = sha.getNativeHeader("Login").get(0);
    String sessionId = sha.getSessionId();

    /** add new session to registry */
    webAgentSessionRegistry.addSession(agentId,sessionId);

    //debug: show connected to stdout
    webAgentSessionRegistry.show();



到目前为止一切顺利。在 IDE 中运行我的 spring webapp 并从两个浏览器选项卡连接我的“客户端”后,我在 IDE 控制台中得到了这个:

session_id / agent_id
-----------------------------
|kecpp1vt|user1|
|10g5e10n|user2|
-----------------------------

好的,现在让我们尝试实现消息机制。

//STOMPController


@Controller
public class STOMPController 

@Autowired
//our registry we have already set up earlier
WebAgentSessionRegistry webAgentSessionRegistry;
@Autowired
//a helper service which I will post below
MessageSender sender;

@MessageMapping("/request")
public void handleRequestMessage() throws InterruptedException 

    Map<String,String> params = new HashMap(1);
    params.put("test","test");
    //a custom object for event, not really relevant
    EventMessage msg = new EventMessage("TEST",params);

    //send to user2 (just for the sake of it)
    String s_id = webAgentSessionRegistry.getSessionId("user2");
    System.out.println("Sending message to user2. Target session: "+s_id);
    sender.sendEventToClient(msg,s_id);
    System.out.println("Message sent");



从应用程序的任何部分发送消息的服务:

//MessageSender

@Service
public class MessageSender implements IMessageSender

@Autowired
WebAgentSessionRegistry webAgentSessionRegistry;
@Autowired
SimpMessageSendingOperations messageTemplate;

private String qName = "/queue/response";

private MessageHeaders createHeaders(String sessionId) 
    SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
    headerAccessor.setSessionId(sessionId);
    headerAccessor.setLeaveMutable(true);
    return headerAccessor.getMessageHeaders();


@Override
public void sendEventToClient(EventMessage event,String sessionId) 
    messageTemplate.convertAndSendToUser(sessionId,qName,event,createHeaders(sessionId));


现在,让我们尝试测试一下。我运行我的 IDE,打开 Chrome 并创建了 2 个连接到服务器的选项卡表单。用户 1 和用户 2。结果控制台:

 session_id / agent_id
    -----------------------------
    |kecpp1vt|user1|
    |10g5e10n|user2|
    -----------------------------
Sending message to user2. Target session: 10g5e10n
Message sent

但是,正如我在开头提到的 - user2 完全没有得到任何东西,尽管他已连接并订阅了“/user/queue/response”。也没有错误。

一个问题是,我到底在哪里漏掉了重点?我已经阅读了许多有关该主题的文章,但无济于事。 SPR-11309 说这是可能的并且应该有效。也许,id-s 不是实际的会话 id-s? 好吧,也许有人知道如何监控消息是否真的已经发送,而不是被内部 Spring 机制丢弃?

解决方案更新:

配置错误的位:

//WebSocketConfig.java:
....
 @Override
public void configureMessageBroker(MessageBrokerRegistry config) 
    /** queue prefix for SUBSCRIPTION (FROM server to CLIENT)  */
    // + parameter "/queue"
    config.enableSimpleBroker("/topic","/queue");
    /** queue prefix for SENDING messages (FROM client TO server) */
    config.setApplicationDestinationPrefixes("/app");

....

我花了一天时间调试内部弹簧机制,以找出问题到底出在哪里:

//AbstractBrokerMessageHandler.java: 
....
protected boolean checkDestinationPrefix(String destination) 
    if ((destination == null) || CollectionUtils.isEmpty(this.destinationPrefixes)) 
        return true;
    
    for (String prefix : this.destinationPrefixes) 
        if (destination.startsWith(prefix)) 
//guess what? this.destinationPrefixes contains only "/topic". Surprise, surprise
            return true;
        
    
    return false;

....

尽管我不得不承认,我仍然认为文档中提到用户个人队列不会被显式配置,因为它们“已经存在”。也许是我弄错了。

【问题讨论】:

【参考方案1】:

总体上看起来不错,但你能改变一下

config.enableSimpleBroker("/topic");

config.enableSimpleBroker("/queue");

...看看这是否有效?希望对您有所帮助。

【讨论】:

我花了一天时间调试内部弹簧机制,以找出它到底哪里出了问题 我已经发布了一个新问题***.com/questions/43536507/…你能帮我解决这个问题吗?

以上是关于弹簧+WebSocket+STOMP。给特定会话的消息(非用户)的主要内容,如果未能解决你的问题,请参考以下文章

在 Stomp Disconnect 命令上拒绝 Spring WebSocket 访问

如何通过spring websocket STOMP向特定订阅发送消息?

如何使用 SockJs 通过 STOMP Java 客户端验证 Spring 非 Web Websocket?

Spring STOMP over Websocket - “私人”消息传递

在spring中获取websocket会话的关联HTTPSession

Springboot 整合 WebSocket ,使用STOMP协议+Redis 解决负载场景问题