Atmosphere Jersey 资源调用永远不会触发 WebSocketTextListener 的 OnMessage 方法

Posted

技术标签:

【中文标题】Atmosphere Jersey 资源调用永远不会触发 WebSocketTextListener 的 OnMessage 方法【英文标题】:Atmosphere Jersey resource call never fires OnMessage method of WebSocketTextListener 【发布时间】:2012-07-19 16:57:43 【问题描述】:

我编写了两个 junit 方法来使用 Atmosphere 和 webSockets 测试我的 Jersey 资源。

问题是当我调用 Suspend 和 Broacast 时,只调用了我的 WebSocketTextListener 的 onOpen 方法。 OnError、OnMessage、OnClose 都没有被调用:(

知道为什么不调用 OnMessage 方法吗?

Atmosphere Jersey 资源:

@Path("/websocket")
    @Suspend
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public String suspend() 
        return "";
       

    @Path("/websocket")
    @Broadcast(writeEntity = false)
    @POST
    @Produces(MediaType.APPLICATION_JSON)
    public String broadcast(String message) 
        return "BROADCASTTT";
      

测试暂停 WEBSOCKET 调用:

 @Test
    public void testAddMealSubscriber() throws Exception 

        final CountDownLatch latch = new CountDownLatch(1);
        String restaurantId = "SalernoNapoliBarcelona";
        String mealId = "14b74bddc68d6f1b4c22e7f7b200067f";

        String url = "ws://localhost:8080/rest/" + "restaurants/" + restaurantId + "/meals/" + mealId + "/websocket/";

        AsyncHttpClient client = new AsyncHttpClient();

        try 
            final AtomicReference response = new AtomicReference(null);
            WebSocket websocket = client.prepareGet(url)
                    .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(
                            new WebSocketTextListener() 

                                @Override
                                public void onMessage(String message) 
                                    System.out.println("WebSocketTextListener onMessage:" + message);
                                    response.set(message);
                                    latch.countDown();
                                

                                @Override
                                public void onFragment(String fragment, boolean last) 

                                    System.out.println("WebSocketTextListener onFragment:" + fragment);
                                

                                @Override
                                public void onOpen(WebSocket websocket) 
                                    System.out.println("WebSocketTextListener onOpen");
                                

                                @Override
                                public void onClose(WebSocket websocket) 
                                    System.out.println("WebSocketTextListener onClose");
                                    latch.countDown();
                                

                                @Override
                                public void onError(Throwable t) 
                                    System.out.println("WebSocketTextListener onError");
                                    t.printStackTrace();
                                
                            ).build()).get();

            try 
                latch.await(60, TimeUnit.SECONDS);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            assertNotNull(response.get());
            assertEquals(response.get(), "echo");
         catch (Exception e) 
            e.printStackTrace();
        

        client.close();

    

测试广播网络套接字调用:

 @Test
    public void testAddMealPublisher() throws Exception 

        final CountDownLatch latch = new CountDownLatch(1);
        String restaurantId = "SalernoNapoliBarcelona";
        String mealId = "14b74bddc68d6f1b4c22e7f7b200067f";

        String url = "ws://localhost:8080/rest/" + "restaurants/" + restaurantId + "/meals/" + mealId + "/websocket/";

        AsyncHttpClient c = new AsyncHttpClient();
        try 
            final AtomicReference response = new AtomicReference(null);

            WebSocket websocket = c.prepareGet(url)
                    .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(
                            new WebSocketTextListener() 

                                @Override
                                public void onMessage(String message) 
                                    response.set(message);
                                    latch.countDown();
                                

                                @Override
                                public void onFragment(String fragment, boolean last) 

                                    System.out.println("WebSocketTextListener onFragment:" + fragment);
                                

                                @Override
                                public void onOpen(WebSocket websocket) 
                                    System.out.println("WebSocketTextListener onOpen");
                                

                                @Override
                                public void onClose(WebSocket websocket) 
                                    System.out.println("WebSocketTextListener onClose");
                                    latch.countDown();
                                

                                @Override
                                public void onError(Throwable t) 
                                    System.out.println("WebSocketTextListener onError");
                                    t.printStackTrace();
                                
                            ).build()).get().sendTextMessage("MESSSAGGGEEEE");


            try 
                latch.await(5, TimeUnit.SECONDS);
             catch (InterruptedException e) 
                e.printStackTrace();
            

            assertNotNull(response.get());
            assertEquals(response.get(), "echo");

         catch (Exception e) 
            e.printStackTrace();
        

        c.close();

    

执行第一个 SUSPEND CALL 然后 BRODCAST 调用时的球衣日志:

2012 年 7 月 20 日下午 1:54:10 com.sun.jersey.api.container.filter.LoggingFilter 过滤器
INFO: 1 * 服务器入站请求
1 > 获取 http://localhost:8080/rest/restaurants/SalernoNapoliBarcelona/meals/14b74bddc68d6f1b4c22e7f7b200067f/websocket/
1 > Sec-WebSocket-版本:13
1 > 升级:WebSocket
1 > Sec-WebSocket-Key: Wf7vyIGCD3Sa8StcdsGIkg==
1 > 主机:本地主机:8080
1 > 接受:*/*
1 > 用户代理:NING/1.0
1 > 连接:升级
1> 来源:http://localhost:8080
1 > X-Atmosphere-Transport:websocket
1 >

2012 年 7 月 20 日下午 1:54:31 com.sun.jersey.api.container.filter.LoggingFilter 过滤器
INFO: 2 * 服务器入站请求
2 > 获取 http://localhost:8080/rest/restaurants/SalernoNapoliBarcelona/meals/14b74bddc68d6f1b4c22e7f7b200067f/websocket/
2 > Sec-WebSocket-版本:13
2 > 升级:WebSocket
2 > Sec-WebSocket-Key: RH/DbdkwQK1xBwhyhXLkAQ==
2 > 主机:本地主机:8080
2 > 接受:*/*
2 > 用户代理:NING/1.0
2 > 连接:升级
2> 来源:http://localhost:8080
2 > X-Atmosphere-Transport: websocket
2 >

2012 年 7 月 20 日下午 1:54:34 com.sun.jersey.api.container.filter.LoggingFilter 过滤器
INFO: 3 * 服务器入站请求
3 > 发布 http://localhost:8080/rest/restaurants/SalernoNapoliBarcelona/meals/14b74bddc68d6f1b4c22e7f7b200067f/websocket/
3 > X-Atmosphere-Transport: websocket
3 > X-Atmosphere-Transport: websocket
3 > 内容类型:application/json
3 >

2012 年 7 月 20 日下午 1:54:34 com.sun.jersey.api.container.filter.LoggingFilter$Adapter 完成
INFO: 3 * 服务器出站响应
3

【问题讨论】:

【参考方案1】:

问题是由于两个不同的 AsyncHttpClient 用于 SUBSCRIBE 和 BROADCAST 调用。

要解决此问题,请在 SetUp 方法中创建一个 AsyncHttpClient 并在两种测试方法中使用它:

 private AsyncHttpClient client;

    @Before
    public void setUp() throws Exception 

         client = new AsyncHttpClient();

    

【讨论】:

以上是关于Atmosphere Jersey 资源调用永远不会触发 WebSocketTextListener 的 OnMessage 方法的主要内容,如果未能解决你的问题,请参考以下文章

Java Atmosphere Jersey 从可广播返回 JSON 对象

带有 Jersey 2 的气氛 PubSub

如何使用 Atmosphere 框架调用传统的 REST-RPC 调用?

服务器使用 Atmosphere IO 定期推送

使用 java 和 Atmosphere 的基本聊天应用程序

何时在 Jersey 资源中使用 @Singleton