使用 Undertow WebSockets 高效发送大型数据集

Posted

技术标签:

【中文标题】使用 Undertow WebSockets 高效发送大型数据集【英文标题】:Send large data set using Undertow WebSockets efficiently 【发布时间】:2020-03-02 17:27:58 【问题描述】:

我有一个大的 ConcurrentHashMap (cache.getCache()),其中保存了我的所有数据(大约 500+ MB 大小,但随着时间的推移会增长)。客户端可以通过使用普通 java HttpServer 实现的 API 访问它。 这是简化的代码:

JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(new BufferedOutputStream(new GZIPOutputStream(exchange.getResponseBody())))));
new GsonBuilder().create().toJson(cache.getCache(), CacheContainer.class, jsonWriter);

还有一些客户端发送的过滤器,因此它们实际上并没有每次都获取所有数据,但是 HashMap 会不断更新,因此客户端必须经常刷新才能获得最新数据。这是低效的,所以我决定使用 WebSockets 将数据更新实时推送到客户端。

为此我选择了 Undertow,因为我可以简单地从 Maven 导入它,而且我不需要在服务器上进行额外的配置。

在 WS 连接上,我将通道添加到 HashSet 并发送整个数据集(客户端在获取初始数据之前发送带有一些过滤器的消息,但我从示例中删除了这部分):

public class MyConnectionCallback implements WebSocketConnectionCallback 
  CacheContainer cache;
  Set<WebSocketChannel> clients = new HashSet<>();
  BlockingQueue<String> queue = new LinkedBlockingQueue<>();

  public MyConnectionCallback(CacheContainer cache) 
    this.cache = cache;
    Thread pusherThread = new Thread(() -> 
      while (true) 
        push(queue.take());
      
    );
    pusherThread.start();
  

  public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel) 
    webSocketChannel.getReceiveSetter().set(new AbstractReceiveListener() 
      protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) 
        clients.add(webSocketChannel);
        WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
      
    
  

  private void push(String message) 
    Set<WebSocketChannel> closed = new HashSet<>();
    clients.forEach((webSocketChannel) -> 
        if (webSocketChannel.isOpen()) 
            WebSockets.sendText(message, webSocketChannel, null);
         else 
            closed.add(webSocketChannel);
        
    
    closed.foreach(clients::remove);
  

  public void putMessage(String message) 
    queue.put(message);
  

每次更改缓存后,我都会获取新值并将其放入队列中(我不直接序列化 myUpdate 对象,因为在 updateCache 方法中还有其他逻辑)。只有一个线程负责更新缓存:

cache.updateCache(key, myUpdate);
Map<Key,Value> tempMap = new HashMap<>();
tempMap.put(key, cache.getValue(key));
webSocketServer.putMessage(gson.toJson(tempMap));

我用这种方法看到的问题:

    在初始连接时,整个数据集被转换为字符串,我担心过多的请求会导致服务器 OOM。 WebSockets.sendText 只接受 String 和 ByteBuffer 如果我先将通道添加到客户端设置然后发送数据,则可能在发送初始数据之前推送到客户端,客户端将处于无效状态 如果我先发送初始数据,然后将通道添加到客户端集合中,那么在发送初始数据过程中来的推送消息将丢失,客户端将处于无效状态

我为问题 #2 和 #3 提出的解决方案是将消息放入队列中(我会将 Set&lt;WebSocketChannel&gt; 转换为 Map&lt;WebSocketChannel,Queue&lt;String&gt;&gt; 并仅在客户端收到初始消息后将消息发送到队列中数据集,但我欢迎在这里提出任何其他建议。

至于问题 #1,我的问题是通过 WebSocket 发送初始数据的最有效方式是什么?例如,使用 JsonWriter 直接写入 WebSocket。

我意识到客户端可以使用 API 进行初始调用并订阅 WebSocket 以进行更改,但是这种方法使客户端负责拥有正确的状态(他们需要订阅 WS、排队 WS 消息、获取初始数据使用 API,然后在获取初始数据后将排队的 WS 消息应用到他们的数据集),我不想把控制权交给他们,因为数据很敏感。

【问题讨论】:

【参考方案1】:

#2 和#3 的问题似乎与不同的线程能够同时向客户端发送数据状态有关。因此,除了您的方法之外,您还可以考虑其他两种同步方法。

    使用互斥锁来保护对数据和客户端发送的访问。这将数据的读取和发送序列化到客户端,因此(伪)代码变为:
protected void onFullTextMessage(...) 
   LOCK 
     clients.add(webSocketChannel);
     WebSockets.sendText(gson.toJson(cache.getCache()), webSocketChannel, null);
   


void push(String message) 
    Set<WebSocketChannel> closed = new HashSet<>();
    LOCK 
      clients.forEach((webSocketChannel) -> 
          if (webSocketChannel.isOpen()) 
              WebSockets.sendText(message, webSocketChannel, null);
           else 
              closed.add(webSocketChannel);
          
      
    
    closed.foreach(clients::remove);

    创建一个新的类和服务线程,该线程单独负责管理对数据缓存的更改并将这些更改推送到客户端;它将使用内部同步队列来异步处理方法调用,并跟踪已连接的客户端,它将具有如下接口:
public void update_cache(....);
public void add_new_client(WebSocketChannel);

...这些调用中的每一个都将要在对象内部线程上完成的操作排入队列。这保证了初始快照和更新的顺序,因为只有一个线程负责更改缓存并将这些更改传播给订阅者。

至于 #1,如果您使用方法 #2,那么您可以缓存数据的序列化状态,以便在以后的快照中重用(前提是它同时没有被更改)。如评论中所述:这仅在以后的客户端具有相同的过滤器配置时才有效。

【讨论】:

只有一个线程可以发送消息,但为了简单起见,我将这部分放在我最初的问题之外。我现在加进去了。缓存我的数据的序列化状态是行不通的,因为客户端没有获得整个数据。它首先由自定义 Gson 序列化程序过滤,每个客户端都不同,并且基于客户端发送的一些参数。 过滤器的优点。我澄清了我的回答;实际上,我想到的服务线程将负责更改缓存并将消息发送给订阅者。【参考方案2】:

为了解决问题 #2 和 #3,我在每个客户端上设置了一个推送锁定标志,该标志仅在发送初始数据时解锁。设置推送锁后,到达的消息将放置在该客户端队列中。然后在任何新消息之前发送排队的消息。

我通过直接使用 ByteBuffer 而不是 String 来缓解问题 #1。这样我可以因为编码节省一些内存(字符串默认使用 UTF-16)

最终代码:

public class WebSocketClient 
  private boolean pushLock;
  private Gson gson;
  private Queue<CacheContainer> queue = new ConcurrentLinkedQueue<>();

  WebSocketClient(MyQuery query, CacheHandler cacheHandler) 
    pushLock = true;
    this.gson = GsonFactory.getGson(query, cacheHandler);
  

  public synchronized boolean isPushLock() 
    return pushLock;
  

  public synchronized void pushUnlock() 
    pushLock = false;
  

  public Gson getGson() 
    return gson;
  

  public Queue<CacheContainer> getQueue() 
    return queue;
  

  public boolean hasBackLog() 
    return !queue.isEmpty();
  


public class MyConnectionCallback implements WebSocketConnectionCallback 

  private final Map<WebSocketChannel, WebSocketClient> clients = new ConcurrentHashMap<>();
  private final BlockingQueue<CacheContainer> messageQueue = new LinkedBlockingQueue<>();

  private final Gson queryGson = new GsonBuilder().disablehtmlEscaping().create();

  private final CacheHandler cacheHandler;

  MyConnectionCallback(CacheHandler cacheHandler) 
    this.cacheHandler = cacheHandler;
    Thread pusherThread = new Thread(() -> 
      boolean hasPushLock = false;
      while (true) 
        if (messageQueue.isEmpty() && hasPushLock) hasPushLock = pushToAllClients(null);
        else hasPushLock = pushToAllClients(messageQueue.take());
      
    , "PusherThread");
    pusherThread.start();
  

  @Override
  public void onConnect(WebSocketHttpExchange webSocketHttpExchange, WebSocketChannel webSocketChannel) 
    webSocketChannel.getReceiveSetter().set(new AbstractReceiveListener() 
      @Override
      protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) throws IOException 
        MyQuery query = new MyQuery(queryGson.fromJson(message.getData(), QueryJson.class));
        WebSocketClient clientConfig = new WebSocketClient(query, cacheHandler);
        clients.put(webSocketChannel, clientConfig);
        push(webSocketChannel, clientConfig.getGson(), cacheHandler.getCache());
        clientConfig.pushUnlock();
        
    );
    webSocketChannel.resumeReceives();
  

  void putMessage(CacheContainer message) 
    messageQueue.put(message);
  

  private synchronized void push(WebSocketChannel webSocketChannel, Gson gson, CacheContainer message) throws IOException 
    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
      JsonWriter jsonWriter = new JsonWriter(new OutputStreamWriter(baos, StandardCharsets.UTF_8))) 
      gson.toJson(message, CacheContainer.class, jsonWriter);
      jsonWriter.flush();
      if (baos.size() > 2) 
        WebSockets.sendText(ByteBuffer.wrap(baos.toByteArray()), webSocketChannel, null);
      
    
  

  private synchronized boolean pushToAllClients(CacheContainer message) 
    AtomicBoolean hadPushLock = new AtomicBoolean(false);
    Set<WebSocketChannel> closed = new HashSet<>();

    clients.forEach((webSocketChannel, clientConfig) -> 
      if (webSocketChannel.isOpen()) 
        if (clientConfig.isPushLock()) 
          hadPushLock.set(true);
          clientConfig.getQueue().add(message);
         else 
          try 
            if (clientConfig.hasBackLog())
              pushBackLog(webSocketChannel, clientConfig);
            if (message != null)
              push(webSocketChannel, clientConfig.getGson(), message);
           catch (Exception e) 
            closeChannel(webSocketChannel, closed);
          
        
       else 
        closed.add(webSocketChannel);
      
    );

    closed.forEach(clients::remove);
    return hadPushLock.get();
  

  private void pushBackLog(WebSocketChannel webSocketChannel, WebSocketClient clientConfig) throws IOException 
    while (clientConfig.hasBackLog()) 
      push(webSocketChannel, clientConfig.getGson(), clientConfig.getQueue().poll());
    
  

  private void closeChannel(WebSocketChannel channel, Set<WebSocketChannel> closed) 
    closed.add(channel);
    channel.close();
  

【讨论】:

以上是关于使用 Undertow WebSockets 高效发送大型数据集的主要内容,如果未能解决你的问题,请参考以下文章

嵌入式Undertown同时部署websocket和servlet,不起作用

µWebSockets:一种WebSocket服务器实现

undertow简单入门

使用 Undertow 的基于文本的处理程序配置格式的示例

Spring Cloud 升级之路 - 2020.0.x - 2. 使用 Undertow 作为我们的 Web 服务容器

Undertow:使用现有的 Servlet 实例