使响应顺序与 Netty 中的请求顺序匹配

Posted

技术标签:

【中文标题】使响应顺序与 Netty 中的请求顺序匹配【英文标题】:Make order of responses match order of requests in Netty 【发布时间】:2022-01-16 09:18:43 【问题描述】:

上下文

我正在编写一个 Netty 应用程序 (Netty 4),其中处理每条消息可能需要一些时间。作为我的意思的一个例子,我创建了一个EchoHandler,它会响应原始消息,尽管有时会在短暂的延迟之后:

public class EchoHandler extends ChannelInboundHandlerAdapter 
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) 
    var message = (String) msg;

    if (message.equals("delay")) 
      ctx.executor().schedule(() -> ctx.writeAndFlush(message), 400, TimeUnit.MILLISECONDS);
     else 
      ctx.writeAndFlush(message);
    
  

问题

使用此代码,不能保证响应的顺序与传入请求的顺序相同。事实上,如果第一条消息由字符串“delay”和其他字符串中的第二条组成,那么响应的顺序将会颠倒!我写了一个测试来说明这一点:

public class Tests 
  @Test
  public void test() throws ExecutionException, InterruptedException 
    var channel = new EmbeddedChannel(new EchoHandler());
    channel.writeInbound("delay", "second message");

    // Let some time pass and process any scheduled tasks
    Thread.sleep(500);
    channel.runPendingTasks();

    // The response to the last message comes first!
    var firstMessage = (String) channel.readOutbound();
    Assertions.assertEquals("second message", firstMessage);

    // The response to the first message comes second!
    var secondMessage = (String) channel.readOutbound();
    Assertions.assertEquals("delay", secondMessage);
  

问题

我正在 Netty 中寻找一种内置方式来保证传出响应的顺序与传入消息的顺序相匹配。使用库的虚构扩展,我将重写处理程序如下:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) 
  var message = (String) msg;

  ctx.pauseMessageProcessing(); // Upcoming messages will be temporarily queued
  
  if (message.equals("delay")) 
    ctx.executor().schedule(() -> 
      ctx.writeAndFlush(message);
      ctx.resumeMessageProcessing(); // After flushing we can resume message processing
    , 400, TimeUnit.MILLISECONDS);
   else 
    ctx.writeAndFlush(message);
    ctx.resumeMessageProcessing(); // After flushing we can resume message processing
  

Netty 是否提供开箱即用的类似功能?这听起来像是一个常见的用例,我更愿意依赖经过实战测试的代码,而不是编写自己的队列实现。

【问题讨论】:

【参考方案1】:

虽然不是内置的,但这个问题的一个看似惯用的解决方案是自定义MessageToMessageCodec。基于使用String 作为消息类型的示例,我编写了以下编解码器来为您处理排队:

public class QueuingCodec extends MessageToMessageCodec<String, String> 

  private final Queue<String> messageQueue = new ArrayDeque<>();
  private boolean processingMessage = false;

  @Override
  protected void encode(ChannelHandlerContext ctx, String s, List<Object> list)
      throws Exception 

    // Pass the message on to output
    list.add(s);

    // Allow processing more messages
    processingMessage = false;

    // Send the next message in the queue if available
    if (!messageQueue.isEmpty()) 
      processingMessage = true;
      var message = messageQueue.poll();

      // Pass the message on to input
      ctx.executor().execute(() -> 
        ctx.fireChannelRead(message);
      );
    
  

  @Override
  protected void decode(ChannelHandlerContext ctx, String s, List<Object> list)
      throws Exception 

    if (processingMessage) 
      // Store message for later processing
      messageQueue.add(s);
     else 
      // Pass data on to input
      list.add(s);

      // Prevent processing of new messages
      processingMessage = true;
    
  

【讨论】:

以上是关于使响应顺序与 Netty 中的请求顺序匹配的主要内容,如果未能解决你的问题,请参考以下文章

过滤器匹配规则执行顺序

nginx中server块的匹配顺序

使顺序 HTTP 请求成为节点中的阻塞操作吗?

Filter执行顺序

网络I/o编程模型23 netty的出站与入站中handler加载与执行顺序

如何获得顺序 MKDirection 请求响应 Swift