使响应顺序与 Netty 中的请求顺序匹配
Posted
技术标签:
【中文标题】使响应顺序与 Netty 中的请求顺序匹配【英文标题】:Make order of responses match order of requests in Netty 【发布时间】:2022-01-16 09:18:43 【问题描述】:上下文
我正在编写一个 Netty 应用程序 (Netty 4),其中处理每条消息可能需要一些时间。作为我的意思的一个例子,我创建了一个EchoHandler
,它会响应原始消息,尽管有时会在短暂的延迟之后:
public class EchoHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
var message = (String) msg;
if (message.equals("delay"))
ctx.executor().schedule(() -> ctx.writeAndFlush(message), 400, TimeUnit.MILLISECONDS);
else
ctx.writeAndFlush(message);
问题
使用此代码,不能保证响应的顺序与传入请求的顺序相同。事实上,如果第一条消息由字符串“delay”和其他字符串中的第二条组成,那么响应的顺序将会颠倒!我写了一个测试来说明这一点:
public class Tests
@Test
public void test() throws ExecutionException, InterruptedException
var channel = new EmbeddedChannel(new EchoHandler());
channel.writeInbound("delay", "second message");
// Let some time pass and process any scheduled tasks
Thread.sleep(500);
channel.runPendingTasks();
// The response to the last message comes first!
var firstMessage = (String) channel.readOutbound();
Assertions.assertEquals("second message", firstMessage);
// The response to the first message comes second!
var secondMessage = (String) channel.readOutbound();
Assertions.assertEquals("delay", secondMessage);
问题
我正在 Netty 中寻找一种内置方式来保证传出响应的顺序与传入消息的顺序相匹配。使用库的虚构扩展,我将重写处理程序如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
var message = (String) msg;
ctx.pauseMessageProcessing(); // Upcoming messages will be temporarily queued
if (message.equals("delay"))
ctx.executor().schedule(() ->
ctx.writeAndFlush(message);
ctx.resumeMessageProcessing(); // After flushing we can resume message processing
, 400, TimeUnit.MILLISECONDS);
else
ctx.writeAndFlush(message);
ctx.resumeMessageProcessing(); // After flushing we can resume message processing
Netty 是否提供开箱即用的类似功能?这听起来像是一个常见的用例,我更愿意依赖经过实战测试的代码,而不是编写自己的队列实现。
【问题讨论】:
【参考方案1】:虽然不是内置的,但这个问题的一个看似惯用的解决方案是自定义MessageToMessageCodec
。基于使用String
作为消息类型的示例,我编写了以下编解码器来为您处理排队:
public class QueuingCodec extends MessageToMessageCodec<String, String>
private final Queue<String> messageQueue = new ArrayDeque<>();
private boolean processingMessage = false;
@Override
protected void encode(ChannelHandlerContext ctx, String s, List<Object> list)
throws Exception
// Pass the message on to output
list.add(s);
// Allow processing more messages
processingMessage = false;
// Send the next message in the queue if available
if (!messageQueue.isEmpty())
processingMessage = true;
var message = messageQueue.poll();
// Pass the message on to input
ctx.executor().execute(() ->
ctx.fireChannelRead(message);
);
@Override
protected void decode(ChannelHandlerContext ctx, String s, List<Object> list)
throws Exception
if (processingMessage)
// Store message for later processing
messageQueue.add(s);
else
// Pass data on to input
list.add(s);
// Prevent processing of new messages
processingMessage = true;
【讨论】:
以上是关于使响应顺序与 Netty 中的请求顺序匹配的主要内容,如果未能解决你的问题,请参考以下文章