Netty 4.1:使用 ContinuationWebSocketFrame 发送多个 WebSocketFrame Fragments 和 Closer
Posted
技术标签:
【中文标题】Netty 4.1:使用 ContinuationWebSocketFrame 发送多个 WebSocketFrame Fragments 和 Closer【英文标题】:Netty 4.1: Send multiple WebSocketFrame Fragments and Closer with ContinuationWebSocketFrame 【发布时间】:2019-08-22 17:40:46 【问题描述】:我有一个Subscriber,它在下一个可用有效负载上接收sequential callbacks,在收到最后一个有效负载时有一个subsequent callback。 (以下简化代码)
我想做的是在每个有效负载回调上发送 WebSocketFrame 片段,并在订阅者完成时关闭整个逻辑帧。
我试过了:
-
以
new TextWebSocketFrame(false, 0, buf)
开头,以new TextWebSocketFrame(true, 0, buf)
结束。
以new ContinuationWebSocketFrame(false, 0, buf)
开头,以new TextWebSocketFrame(false, 0, buf)
继续,以new TextWebSocketFrame(true, 0, buf)
结束。
以new ContinuationWebSocketFrame(false, 0, buf)
开头,以new TextWebSocketFrame(false, 0, buf)
继续,以new ContinuationWebSocketFrame(true, 0, buf)
结束。
.... 以及更多组合,但我的 Chrome 浏览器客户端(版本 76.0.3809.100(官方构建)(64 位))要么报告完全违反 WebSock 协议,要么抱怨已收到多个 ContinuationWebSocketFrames。
您能否指出正确的方向,以使用正确配置的 WebSocketFrames 的正确序列正常工作?
非常感谢。
static class DataReceiver implements Subscriber<Object>
private Channel channel;
private final AtomicBoolean firstFrame = new AtomicBoolean(false);
public DataReceiver(Channel channel)
this.channel = channel;
@Override
public void onSubscribe(Subscription s)
s.request(Long.MAX_VALUE);
@Override
public void onNext(Object obj)
ByteBuf buf = JSONOps.serialize(obj, channel.alloc().ioBuffer());
if (firstFrame.compareAndSet(false, true))
channel.writeAndFlush(new ContinuationWebSocketFrame(false, 0, buf));
else
channel.writeAndFlush(new TextWebSocketFrame(false, 0, buf));
@Override
public void onError(Throwable t)
log.error("Subscriber Error", t);
// TODO: Handle error
@Override
public void onComplete()
// channel.writeAndFlush(new ContinuationWebSocketFrame(true, 0, Unpooled.EMPTY_BUFFER));
channel.writeAndFlush(new TextWebSocketFrame(true, 0, Unpooled.EMPTY_BUFFER));
【问题讨论】:
【参考方案1】:它现在正在工作。有2个问题。一个是我的管道中没有WebSocketFrameAggregator。其次,结尾ContinuationWebSocketFrame一定要有内容。当我使用 Unpooled.EMPTY_BUFFER 关闭聚合时它不起作用。
事实证明,无论如何我都需要在最后插入一个 JSON 数组。
工作(简化类)如下所示:
static class DataReceiver implements Subscriber<Object>
private Channel channel;
private final AtomicBoolean firstFrame = new AtomicBoolean(false);
public DataReceiver(Channel channel)
this.channel = channel;
@Override
public void onSubscribe(Subscription s)
s.request(Long.MAX_VALUE);
@Override
public void onNext(Object obj)
ByteBuf buf = JSONOps.serialize(obj, channel.alloc().ioBuffer(4196));
if (firstFrame.compareAndSet(false, true))
ByteBuf b = Unpooled.wrappedBuffer(Unpooled.wrappedBuffer("[".getBytes()), buf);
channel.writeAndFlush(new TextWebSocketFrame(false, 0, b));
else
ByteBuf b = Unpooled.wrappedBuffer(Unpooled.wrappedBuffer(",".getBytes()), buf);
channel.writeAndFlush(new ContinuationWebSocketFrame(false, 0, b));
@Override
public void onError(Throwable t)
log.error("Subscriber Error", t);
// TODO: Handle error
@Override
public void onComplete()
channel.writeAndFlush(new ContinuationWebSocketFrame(true, 0, Unpooled.wrappedBuffer("]".getBytes())));
【讨论】:
以上是关于Netty 4.1:使用 ContinuationWebSocketFrame 发送多个 WebSocketFrame Fragments 和 Closer的主要内容,如果未能解决你的问题,请参考以下文章
SWIFT TASK CONTINUATION MISUSE:泄露了它的延续