如果资源服务器应该是无状态的,如何使用 websocket 将消息发送到队列

Posted

技术标签:

【中文标题】如果资源服务器应该是无状态的,如何使用 websocket 将消息发送到队列【英文标题】:If resource server are supposed to be stateless, how to send message to queue with websocket 【发布时间】:2018-02-20 07:53:27 【问题描述】:

我目前在消息传递系统中工作,其中资源服务器使用 oAuth 2 是无状态的。现在,我必须使用队列向单个用户发送消息,但问题是 spring 消息传递需要另一个会话来发送消息在https://***.com/a/31577152/3076403 中描述。

我的问题是如何在无状态的restful服务中获取当前登录用户:

@MessageMapping("/messaging")
public void messaging( Message<Object> message) 
    Principal user=    
        message.getHeaders()
             .get(SimpMessageHeaderAccessor.USER_HEADER,Principal.class); 

        messageTemplate.convertAndSend("/topic/users", user.getName());

当我们使用 simpMessagingTemplate.convertAndSendToUser(...) 方法并传递与会话 ID 关联的用户名时,Spring 将使用队列。否则它将使用一个主题,所有订阅的客户端最终将读取从服务器返回的相同消息。

因为我在资源服务器中没有会话,需要排队向单个用户发送消息。任何 cmets 和想法表示赞赏

【问题讨论】:

【参考方案1】:

毕竟我得到了解决方案。通过解码用户名的 json web token 并为用户名提供身份验证解决了上述问题。 JwtAuthentication 是自定义类,负责解码 JWT 并为 JWT 的用户名提供身份验证

@Configuration
@EnableWebSocketMessageBroker
@Order(Ordered.HIGHEST_PRECEDENCE + 99)
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer 

    @Autowired
    private SimpUserRegistry userRegistry;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) 
        config.enableSimpleBroker("/topic","/queue");
        // use the /app prefix for others
        config.setApplicationDestinationPrefixes("/app");
    

    @Autowired
    private JwtAuthentication jwtAuthentication;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) 
        // use the /messaging endpoint (prefixed with /app as configured above) for incoming requests
        registry.addEndpoint("/messaging").setAllowedOrigins("http://localhost:8080").withSockJS();
    

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) 
        registration.setInterceptors(new ChannelInterceptorAdapter() 
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) 
                StompHeaderAccessor accessor =
                        MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                List<String> tokenList = accessor.getNativeHeader("Authorization");
                String token = null;
                if(tokenList != null && tokenList.size() > 0) 
                  token = tokenList.get(0).replaceAll("Bearer", "").trim();
                
                if (StompCommand.CONNECT.equals(accessor.getCommand()) || StompCommand.SUBSCRIBE.equals(accessor.getCommand()) || StompCommand.SEND.equals(accessor.getCommand()) ) 
                    Authentication auth =  SecurityContextHolder.getContext().getAuthentication();
                    if(auth==null)
                         Authentication user = jwtAuthentication.getAuthentication(token); // access authentication header(s)
                         SecurityContextHolder.getContext().setAuthentication(user);
                          ((DefaultSimpUserRegistry) userRegistry).onApplicationEvent(new SessionConnectedEvent(this, (Message<byte[]>) message, auth));
                         accessor.setUser(user);
                     else 
                        accessor.setUser(auth);
                         ((DefaultSimpUserRegistry) userRegistry).onApplicationEvent(new SessionConnectedEvent(this, (Message<byte[]>) message, auth));
                    
                
                accessor.setLeaveMutable(true);
                return MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
            
        );
    

在应用程序上下文中,我们需要注册 SimpUserRegistry @豆 @基本的 公共 SimpUserRegistry userRegistry() 返回新的 DefaultSimpUserRegistry();

      @Bean
      @Primary
      public UserDestinationResolver userDestinationResolver() 
        return new DefaultUserDestinationResolver(userRegistry());
      

现在我们可以向特定用户发送消息

public void handle(Exchange exchange) 
        Message camelMessage = exchange.getIn();
        com.livetalk.user.utils.Message message = camelMessage.getBody( com.livetalk.user.utils.Message.class);
        // send the message specifically to the destination user by using STOMP's user-directed messaging
        msgTemplate.convertAndSendToUser(message.getRecipient(), "/queue/messages", message, defaultHeaders);
    

【讨论】:

以上是关于如果资源服务器应该是无状态的,如何使用 websocket 将消息发送到队列的主要内容,如果未能解决你的问题,请参考以下文章

如果 oauth 是无状态的,谷歌如何销毁 oauth 令牌?

WebSocket简介

cookie是啥?

如果资源不可用于请求的操作,HTTP 状态代码应该是啥?

如何区分应用程序状态和资源状态

所有视图都应该是无状态组件吗?