使用持久队列 STOMP 向离线用户发送通知

Posted

技术标签:

【中文标题】使用持久队列 STOMP 向离线用户发送通知【英文标题】:Send notification to offline user with persistent queue STOMP 【发布时间】:2017-01-25 13:41:52 【问题描述】:

我有这个代码:使用 javascript 的客户端:

   socket = new SockJS(context.backend + '/myWebSocketEndPoint');
   stompClient = Stomp.over(socket);
   stompClient.connect(,function (frame) 
           stompClient.subscribe('/queue/'+clientId+'/notification', function(response)
               alert(angular.fromJson(response.body));
           );
   );

在此代码中,客户端在连接时,使用'/queue/'+他的客户端ID +'/notification/订阅接收通知。所以我为每个客户排队。我将 stomp 与 sockjs 一起使用

在我的服务器(Java + spring boot)中,我有一个通知侦听器,当事件发布时,它会向所有客户端发送通知。所以我有:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer

 @Override
    public void configureMessageBroker(MessageBrokerRegistry config) 
    config.enableSimpleBroker("/queue");



@Override
public void registerStompEndpoints(StompEndpointRegistry registry) 
    registry.addEndpoint("/myWebSocketEndPoint")
            .setAllowedOrigins("*")
            .withSockJS();


调用 MenuItemNotificationSender 向用户发送通知的类 MenuItemNotificationChannel。

@Component
public class MenuItemNotificationChannel extends AbstractNotificationChannel 

@Autowired
private MenuItemNotificationSender menuItemNotificationSender;

@Autowired
private UserRepository userRepository;

@Override
public void sendNotification(KitaiEvent<?> event, Map<String, Object> notificationConfiguration) throws Exception 
    String menuItem = Optional.ofNullable((String) notificationConfiguration.get(MENU_ENTRY_KEY)).orElseThrow(IllegalArgumentException::new);
    List<User> userList = userRepository.findAll();
    for(User u: userList)
        menuItemNotificationSender.sendNotification(new MenuItemDto(menuItem),u.getId());
    

MenuItemNotificationSender 类是:

@Component
public class MenuItemNotificationSender 

@Autowired
private SimpMessagingTemplate messagingTemplate;

@Autowired
public MenuItemNotificationSender(SimpMessagingTemplate messagingTemplate)
    this.messagingTemplate = messagingTemplate;


public void sendNotification(MenuItemDto menuItem,Long id) 
    String address = "/queue/"+id+"/notification";
    messagingTemplate.convertAndSend(address, menuItem);


此代码完美运行:通知会发送给每个用户。但是如果用户不在线,通知就会丢失。我的问题是:

我如何验证哪些订阅有效,哪些订阅无效? (如果我可以验证订阅是否处于活动状态,我会解决我的问题,因为我会离线保存用户通知,然后在他们登录时发送)

我可以使用持久队列吗? (我读过一些关于它的东西,但我不明白我是否只能将它与 stomp 和 sockjs 一起使用)

对不起我的英语! :D

【问题讨论】:

【参考方案1】:

你可以在会话连接事件和会话断开事件上放置一个spring事件监听器 我用 spring 4.3.4 测试了这个

@Component
public class WebSocketSessionListener

    private static final Logger logger = LoggerFactory.getLogger(WebSocketSessionListener.class.getName());
    private List<String> connectedClientId = new ArrayList<String>();

    @EventListener
    public void connectionEstablished(SessionConnectedEvent sce)
    
        MessageHeaders msgHeaders = sce.getMessage().getHeaders();
        Principal princ = (Principal) msgHeaders.get("simpUser");
        StompHeaderAccessor sha = StompHeaderAccessor.wrap(sce.getMessage());
        List<String> nativeHeaders = sha.getNativeHeader("userId");
        if( nativeHeaders != null )
        
            String userId = nativeHeaders.get(0);
            connectedClientId.add(userId);
            if( logger.isDebugEnabled() )
            
                logger.debug("Connessione websocket stabilita. ID Utente "+userId);
            
        
        else
        
            String userId = princ.getName();
            connectedClientId.add(userId);
            if( logger.isDebugEnabled() )
            
                logger.debug("Connessione websocket stabilita. ID Utente "+userId);
            
        
    

    @EventListener
    public void webSockectDisconnect(SessionDisconnectEvent sde)
    
        MessageHeaders msgHeaders = sde.getMessage().getHeaders();
        Principal princ = (Principal) msgHeaders.get("simpUser");
        StompHeaderAccessor sha = StompHeaderAccessor.wrap(sde.getMessage());
        List<String> nativeHeaders = sha.getNativeHeader("userId");
        if( nativeHeaders != null )
        
            String userId = nativeHeaders.get(0);
            connectedClientId.remove(userId);
            if( logger.isDebugEnabled() )
            
                logger.debug("Disconnessione websocket. ID Utente "+userId);
            
        
        else
        
            String userId = princ.getName();
            connectedClientId.remove(userId);
            if( logger.isDebugEnabled() )
            
                logger.debug("Disconnessione websocket. ID Utente "+userId);
            
        
    

    public List<String> getConnectedClientId()
    
        return connectedClientId;
    
    public void setConnectedClientId(List<String> connectedClientId)
    
        this.connectedClientId = connectedClientId;
    

当客户端连接时,您在客户端 ID 列表中添加客户端 ID;当它断开连接时,您将其删除

然后你可以在你想要检查客户端是否处于活动状态或更少的地方注入这个bean或其列表,然后你可以检查客户端ID是否在连接的客户端ID之间你可以发送消息,否则你必须保存它稍后再发送

在客户端,您可以执行以下操作:

var socket = new SockJS('/ws');
stompClient = Stomp.over(socket);
stompClient.connect(userId:"customUserId", function (frame) 
);

安杰洛

【讨论】:

Grazie, avevo pensato anche io qualcosa di simile :) di nulla.. è da perfezionare.. ma puoi quietlamente partire da questo【参考方案2】:

为什么不使用下面的一些事件,您可以将类导出到不同的文件并使用 SessionConnectedEventSessionDisconnectEventSessionSubscribeEventSessionUnsubscribeEvent。 在此处查看文档http://docs.spring.io/spring/docs/current/spring-framework-reference/html/websocket.html#websocket-stomp-appplication-context-events

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import org.springframework.web.socket.messaging.SessionUnsubscribeEvent;

@Component
public class SessionConnectedListener extends SessionsListener implements ApplicationListener<SessionConnectedEvent> 

    @Override
    public void onApplicationEvent(SessionConnectedEvent event) 
        users.add(event.getUser().getName());
    



@Component
class SessionDisconnectListener extends SessionsListener implements ApplicationListener<SessionDisconnectEvent> 

    @Override
    public void onApplicationEvent(SessionDisconnectEvent event) 
        users.remove(event.getUser().getName());
    


@Component
class SessionSubscribeListener extends SessionsListener implements ApplicationListener<SessionSubscribeEvent> 

    @Override
    public void onApplicationEvent(SessionSubscribeEvent event) 
        users.add(event.getUser().getName());
    


@Component
class SessionUnsubscribeListener extends SessionsListener implements ApplicationListener<SessionUnsubscribeEvent> 

    @Override
    public void onApplicationEvent(SessionUnsubscribeEvent event) 
        users.remove(event.getUser().getName());
    


class SessionsListener 

    protected List<String> users = Collections.synchronizedList(new LinkedList<String>());

    public List<String> getUsers() 
        return users;
    

并更改您的代码:

@Autowired
private SessionsListener sessionsListener;

@Override
public void sendNotification(KitaiEvent<?> event, Map<String, Object> notificationConfiguration) throws Exception 
    String menuItem = Optional.ofNullable((String) notificationConfiguration.get(MENU_ENTRY_KEY)).orElseThrow(IllegalArgumentException::new);
    List<String> userList = sessionsListener.getUsers();
    for(String u: userList)
        menuItemNotificationSender.sendNotification(new MenuItemDto(menuItem),u);
    

【讨论】:

谢谢,我正在考虑类似的事情

以上是关于使用持久队列 STOMP 向离线用户发送通知的主要内容,如果未能解决你的问题,请参考以下文章

向离线用户键入消息会显示错误

QuickBlox 向离线用户推送通知问题

如何在 xmpp smack 或 asmack 中将文件发送给离线用户?

通过 spring websocket、sockJs 和 STOMP 向经过身份验证的用户发送通知

如果通知在脱机时发送,Android是否可以检索FCM通知

在设备离线时响应本机应用程序发送推送通知