如何使用 WebSocket 发送定期消息?

Posted

技术标签:

【中文标题】如何使用 WebSocket 发送定期消息?【英文标题】:How to send periodic messages using WebSocket? 【发布时间】:2015-03-17 16:09:55 【问题描述】:

我在Tomcat上使用WebSocket(实际实现是Tyrus,JSR 356的参考实现)。当我必须处理客户端消息并响应它们时,它的效果很好。但是,我想为我的几个客户端控件实现 push 解决方案。其实我需要两种解决方案:

以特定间隔推出数据, 在系统消息出现时将其推送出去。

对于第一个,我认为ScheduledExecutorService 可以是一个解决方案,我已经有一个或多或少的工作示例,但我在清理方面遇到了问题。对于第二个,我想我需要一个线程,它会触发 WebSocket 端点中的一个方法,但我也不知道如何干净地做到这一点。干净,我的意思是我希望只有在有连接到我的端点的会话时才能运行线程。

总结一下我的问题:您将如何正确使用 Java EE WebSocket API 实现推送消息解决方案?

ps.:我更喜欢“纯粹”的解决方案,但 Spring 也不受欢迎。


当前代码骨架

这是我当前解决第一个问题的方法:

@ServerEndpoint(...)
public class MyEndPoint 
    // own class, abstracting away session handling
    private static SessionHandler sessionHandler = new SessionHandler();
    private static ScheduledExecutorService timer =
            Executors.newSingleThreadScheduledExecutor();
    private static boolean timerStarted = false;

    @OnOpen
    public void onOpen(Session session, EndpointConfig config) 
        sessionHandler.addSession(session);
        if (!timerStarted) 
            timer.scheduleAtFixedRate(new Runnable() 
                @Override
                public void run() 
                    sessionHandler.sendToAllSession("foo");
                
            , 0, 3, TimeUnit.SECONDS);
            timerStarted = true;
        
    

    @OnClose
    public void onClose(Session session) 
        sessionHandler.removeSession(session);
        if (0 == sessionHandler.countSessions()) 
            // TODO: cleanup thread properly
            timer.shutdown();
            try 
                while (!timer.awaitTermination(10, TimeUnit.SECONDS));
             catch (InterruptedException e) 
                log.debug("Timer terminated.");
            
            timerStarted = false;
        
    

这或多或少有效,但在重新加载几页后,它会因RejectedExecutionException 而死,我不太确定如何处理这种情况。

【问题讨论】:

【参考方案1】:

很遗憾你在shutdown()之后不能使用任何ExecutorService;

所以在 OnClose() 方法之后下一个 OnOpen() 方法会崩溃。

只是一些演示代码:

public class TestThread 

    public static void main(String[] args) 

        final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
        boolean timerStarted = false;
        //OnOpen - 1;  - OK
        if (!timerStarted) 
            timer.scheduleAtFixedRate(new Runnable() 
                @Override
                public void run() 
                    System.out.println("foo");
                
            , 0, 3, TimeUnit.SECONDS);
            timerStarted = true;
        

        //OnOpen - 2;  - OK
        if (!timerStarted) 
            timer.scheduleAtFixedRate(new Runnable() 
                @Override
                public void run() 
                    System.out.println("foo");
                
            , 0, 3, TimeUnit.SECONDS);
            timerStarted = true;
        

        //OnClose - 1  - OK
        timer.shutdown();
        timerStarted = false;

        //OnOpen - 2;  - NOT OK, because after stop you can't use timer,  RejectedExecutionException will thrown
        if (!timerStarted) 
            // will crash at this invocke
            timer.scheduleAtFixedRate(new Runnable() 
                @Override
                public void run() 
                    System.out.println("foo");
                
            , 0, 3, TimeUnit.SECONDS);
            timerStarted = true;
        
    

您也可以尝试将您的课程用作网络监听器http://docs.oracle.com/javaee/7/api/javax/servlet/annotation/WebListener.html 并在服务器启动和销毁时执行的方法中创建计时器

@WebListener
@ServerEndpoint(...)  
public class MyEndPoint implements ServletContextListener

    final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();

    @Override
    public void contextInitialized(ServletContextEvent servletContextEvent) 
        timer.scheduleWithFixedDelay(...)
    

    @Override
    public void contextDestroyed(ServletContextEvent servletContextEvent) 
        timer.shutdown();
    

    ...

【讨论】:

我喜欢这个主意。不过,我很可能会采用不同的方法。 Endpoint 将只管理会话,并且不同的定期服务将调用会话的发送方法。

以上是关于如何使用 WebSocket 发送定期消息?的主要内容,如果未能解决你的问题,请参考以下文章

如何限制龙卷风 websocket 消息大小

如何使用 STOMP 从 Spring WebSocket 服务器向 WebSocket 客户端发送消息?

如何使用 websocket 在具有相同令牌的用户之间发送消息?

如何仅将 websocket 消息发送给一个用户?

如何从 websocket 端点外部发出 websocket 消息?

如何使用java服务器将消息发送到特定的websocket连接