使用持久队列 STOMP 向离线用户发送通知
Posted
技术标签:
【中文标题】使用持久队列 STOMP 向离线用户发送通知【英文标题】:Send notification to offline user with persistent queue STOMP 【发布时间】:2017-01-25 13:41:52 【问题描述】:我有这个代码:使用 javascript 的客户端:
socket = new SockJS(context.backend + '/myWebSocketEndPoint');
stompClient = Stomp.over(socket);
stompClient.connect(,function (frame)
stompClient.subscribe('/queue/'+clientId+'/notification', function(response)
alert(angular.fromJson(response.body));
);
);
在此代码中,客户端在连接时,使用'/queue/'+他的客户端ID +'/notification/订阅接收通知。所以我为每个客户排队。我将 stomp 与 sockjs 一起使用
在我的服务器(Java + spring boot)中,我有一个通知侦听器,当事件发布时,它会向所有客户端发送通知。所以我有:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer
@Override
public void configureMessageBroker(MessageBrokerRegistry config)
config.enableSimpleBroker("/queue");
@Override
public void registerStompEndpoints(StompEndpointRegistry registry)
registry.addEndpoint("/myWebSocketEndPoint")
.setAllowedOrigins("*")
.withSockJS();
调用 MenuItemNotificationSender 向用户发送通知的类 MenuItemNotificationChannel。
@Component
public class MenuItemNotificationChannel extends AbstractNotificationChannel
@Autowired
private MenuItemNotificationSender menuItemNotificationSender;
@Autowired
private UserRepository userRepository;
@Override
public void sendNotification(KitaiEvent<?> event, Map<String, Object> notificationConfiguration) throws Exception
String menuItem = Optional.ofNullable((String) notificationConfiguration.get(MENU_ENTRY_KEY)).orElseThrow(IllegalArgumentException::new);
List<User> userList = userRepository.findAll();
for(User u: userList)
menuItemNotificationSender.sendNotification(new MenuItemDto(menuItem),u.getId());
MenuItemNotificationSender 类是:
@Component
public class MenuItemNotificationSender
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Autowired
public MenuItemNotificationSender(SimpMessagingTemplate messagingTemplate)
this.messagingTemplate = messagingTemplate;
public void sendNotification(MenuItemDto menuItem,Long id)
String address = "/queue/"+id+"/notification";
messagingTemplate.convertAndSend(address, menuItem);
此代码完美运行:通知会发送给每个用户。但是如果用户不在线,通知就会丢失。我的问题是:
我如何验证哪些订阅有效,哪些订阅无效? (如果我可以验证订阅是否处于活动状态,我会解决我的问题,因为我会离线保存用户通知,然后在他们登录时发送)
我可以使用持久队列吗? (我读过一些关于它的东西,但我不明白我是否只能将它与 stomp 和 sockjs 一起使用)
对不起我的英语! :D
【问题讨论】:
【参考方案1】:你可以在会话连接事件和会话断开事件上放置一个spring事件监听器 我用 spring 4.3.4 测试了这个
@Component
public class WebSocketSessionListener
private static final Logger logger = LoggerFactory.getLogger(WebSocketSessionListener.class.getName());
private List<String> connectedClientId = new ArrayList<String>();
@EventListener
public void connectionEstablished(SessionConnectedEvent sce)
MessageHeaders msgHeaders = sce.getMessage().getHeaders();
Principal princ = (Principal) msgHeaders.get("simpUser");
StompHeaderAccessor sha = StompHeaderAccessor.wrap(sce.getMessage());
List<String> nativeHeaders = sha.getNativeHeader("userId");
if( nativeHeaders != null )
String userId = nativeHeaders.get(0);
connectedClientId.add(userId);
if( logger.isDebugEnabled() )
logger.debug("Connessione websocket stabilita. ID Utente "+userId);
else
String userId = princ.getName();
connectedClientId.add(userId);
if( logger.isDebugEnabled() )
logger.debug("Connessione websocket stabilita. ID Utente "+userId);
@EventListener
public void webSockectDisconnect(SessionDisconnectEvent sde)
MessageHeaders msgHeaders = sde.getMessage().getHeaders();
Principal princ = (Principal) msgHeaders.get("simpUser");
StompHeaderAccessor sha = StompHeaderAccessor.wrap(sde.getMessage());
List<String> nativeHeaders = sha.getNativeHeader("userId");
if( nativeHeaders != null )
String userId = nativeHeaders.get(0);
connectedClientId.remove(userId);
if( logger.isDebugEnabled() )
logger.debug("Disconnessione websocket. ID Utente "+userId);
else
String userId = princ.getName();
connectedClientId.remove(userId);
if( logger.isDebugEnabled() )
logger.debug("Disconnessione websocket. ID Utente "+userId);
public List<String> getConnectedClientId()
return connectedClientId;
public void setConnectedClientId(List<String> connectedClientId)
this.connectedClientId = connectedClientId;
当客户端连接时,您在客户端 ID 列表中添加客户端 ID;当它断开连接时,您将其删除
然后你可以在你想要检查客户端是否处于活动状态或更少的地方注入这个bean或其列表,然后你可以检查客户端ID是否在连接的客户端ID之间你可以发送消息,否则你必须保存它稍后再发送
在客户端,您可以执行以下操作:
var socket = new SockJS('/ws');
stompClient = Stomp.over(socket);
stompClient.connect(userId:"customUserId", function (frame)
);
安杰洛
【讨论】:
Grazie, avevo pensato anche io qualcosa di simile :) di nulla.. è da perfezionare.. ma puoi quietlamente partire da questo【参考方案2】:为什么不使用下面的一些事件,您可以将类导出到不同的文件并使用 SessionConnectedEvent
和 SessionDisconnectEvent
或 SessionSubscribeEvent
和 SessionUnsubscribeEvent
。
在此处查看文档http://docs.spring.io/spring/docs/current/spring-framework-reference/html/websocket.html#websocket-stomp-appplication-context-events
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import org.springframework.web.socket.messaging.SessionUnsubscribeEvent;
@Component
public class SessionConnectedListener extends SessionsListener implements ApplicationListener<SessionConnectedEvent>
@Override
public void onApplicationEvent(SessionConnectedEvent event)
users.add(event.getUser().getName());
@Component
class SessionDisconnectListener extends SessionsListener implements ApplicationListener<SessionDisconnectEvent>
@Override
public void onApplicationEvent(SessionDisconnectEvent event)
users.remove(event.getUser().getName());
@Component
class SessionSubscribeListener extends SessionsListener implements ApplicationListener<SessionSubscribeEvent>
@Override
public void onApplicationEvent(SessionSubscribeEvent event)
users.add(event.getUser().getName());
@Component
class SessionUnsubscribeListener extends SessionsListener implements ApplicationListener<SessionUnsubscribeEvent>
@Override
public void onApplicationEvent(SessionUnsubscribeEvent event)
users.remove(event.getUser().getName());
class SessionsListener
protected List<String> users = Collections.synchronizedList(new LinkedList<String>());
public List<String> getUsers()
return users;
并更改您的代码:
@Autowired
private SessionsListener sessionsListener;
@Override
public void sendNotification(KitaiEvent<?> event, Map<String, Object> notificationConfiguration) throws Exception
String menuItem = Optional.ofNullable((String) notificationConfiguration.get(MENU_ENTRY_KEY)).orElseThrow(IllegalArgumentException::new);
List<String> userList = sessionsListener.getUsers();
for(String u: userList)
menuItemNotificationSender.sendNotification(new MenuItemDto(menuItem),u);
【讨论】:
谢谢,我正在考虑类似的事情以上是关于使用持久队列 STOMP 向离线用户发送通知的主要内容,如果未能解决你的问题,请参考以下文章
如何在 xmpp smack 或 asmack 中将文件发送给离线用户?