netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理相关的知识,希望对你有一定的参考价值。
参考技术A netty写数据的时候,会先放到一个缓存队列AbstractNioChannel.writeBufferQueue中,这个队列是WriteRequestQueueJava代码
public void eventSunk(
ChannelPipeline pipeline, ChannelEvent e) throws Exception
if (e instanceof ChannelStateEvent)
……
else if (e instanceof MessageEvent)
MessageEvent event = (MessageEvent) e;
NiosocketChannel channel = (NioSocketChannel) event.getChannel();
boolean offered = channel.writeBufferQueue.offer(event);//写到channel的writeBufferQueue
assert offered;
channel.worker.writeFromUserCode(channel);
WriteRequestQueue的offer方法中会根据缓存消息的总大小(字节数)判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。
Java代码
public boolean offer(MessageEvent e)
boolean success = queue.offer(e);
assert success;
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (newWriteBufferSize >= highWaterMark)
if (newWriteBufferSize - messageSize < highWaterMark)
highWaterMarkCounter.incrementAndGet();
if (!notifying.get())
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
return true;
fireChannelInterestChanged这个会调到SimpleChannelUpstreamHandler.handleUpstream,触发SimpleChannelUpstreamHandler.channelInterestChanged,可以通过继承这个方法来自定义做些事情。高水位的值可以通过Bootstrap设置,最终会调到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默认值为64K
Java代码
public boolean setOption(String key, Object value)
if (super.setOption(key, value))
return true;
if ("writeBufferHighWaterMark".equals(key))
setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
else if ("writeBufferLowWaterMark".equals(key))
setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
else if ("writeSpinCount".equals(key))
setWriteSpinCount(ConversionUtil.toInt(value));
else if ("receiveBufferSizePredictorFactory".equals(key))
setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value);
else if ("receiveBufferSizePredictor".equals(key))
setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
else
return false;
return true;
然后在write0的时候会从队列拉数据,拉数据的时候,如果发现本次拉的数据会导致缓存的数据大小(字节)从低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨过了低水位,会再次触发fireChannelInterestChanged事件。writeBufferLowWaterMark默认值为32K
Java代码
public MessageEvent poll()
MessageEvent e = queue.poll();
if (e != null)
int messageSize = getMessageSize(e);
int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark)
if (newWriteBufferSize + messageSize >= lowWaterMark) //本次拉取,是的缓存数据大小掉到了低水位之下
highWaterMarkCounter.decrementAndGet();
if (isConnected() && !notifying.get())
notifying.set(Boolean.TRUE);
fireChannelInterestChanged(AbstractNioChannel.this);
notifying.set(Boolean.FALSE);
return e;
超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel. isWritable(),如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的
Java代码
public boolean isWritable()
return (getInterestOps() & OP_WRITE) == 0;
public int getInterestOps()
if (!isOpen())
return Channel.OP_WRITE;
int interestOps = getRawInterestOps();
int writeBufferSize = this.writeBufferSize.get();
if (writeBufferSize != 0)
if (highWaterMarkCounter.get() > 0) //还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1
int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
if (writeBufferSize >= lowWaterMark) //缓存队列数据量,超过高水位,也超过了低水位,意味着高水位>低水位,此时等于注册写操作
interestOps |= Channel.OP_WRITE;
else
interestOps &= ~Channel.OP_WRITE;//缓存队列数据量,超过高水位但是低于低水位,意味着低水位>高水位,此时等于没有注册写操作
else //超过高水位counter<=0,意味着当前数据量小于高水位
int highWaterMark = getConfig().getWriteBufferHighWaterMark();
if (writeBufferSize >= highWaterMark) //这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写操作
interestOps |= Channel.OP_WRITE;
else
interestOps &= ~Channel.OP_WRITE;
else
interestOps &= ~Channel.OP_WRITE;//写队列没数据,没有注册写操作
return interestOps;
即,如果超过高水位isWritable()==false,低于低水位isWritable()==true,低水位优先级高于高水位,即如果 当前水位>低水位 则不可写,否则可写
如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致full gc/cms gc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。
以上是关于netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理的主要内容,如果未能解决你的问题,请参考以下文章
把字符串的增删查改,插入以及删除各种操作封装为一个用c代码写的库,代码怎么写,我写不来求教
请教关于oracle中写存储过程时 select into 语句报错的问题