netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理相关的知识,希望对你有一定的参考价值。

参考技术A   netty写数据的时候,会先放到一个缓存队列AbstractNioChannel.writeBufferQueue中,这个队列是WriteRequestQueue
  Java代码
  public void eventSunk(
  ChannelPipeline pipeline, ChannelEvent e) throws Exception
  if (e instanceof ChannelStateEvent)
  ……
   else if (e instanceof MessageEvent)
  MessageEvent event = (MessageEvent) e;
  NiosocketChannel channel = (NioSocketChannel) event.getChannel();
  boolean offered = channel.writeBufferQueue.offer(event);//写到channel的writeBufferQueue
  assert offered;
  channel.worker.writeFromUserCode(channel);
  
  
  WriteRequestQueue的offer方法中会根据缓存消息的总大小(字节数)判断是否超过了高水位线highWaterMark,如果第一次超过了超过高水位线,就会fireChannelInterestChanged;后边如果仍然一直往队列放数据,缓存的消息的大小持续超过高水位线的时候,不会再fireChannelInterestChanged。
  Java代码
  public boolean offer(MessageEvent e)
  boolean success = queue.offer(e);
  assert success;
  
  int messageSize = getMessageSize(e);
  int newWriteBufferSize = writeBufferSize.addAndGet(messageSize);
  int highWaterMark = getConfig().getWriteBufferHighWaterMark();
  
  if (newWriteBufferSize >= highWaterMark)
  if (newWriteBufferSize - messageSize < highWaterMark)
  highWaterMarkCounter.incrementAndGet();
  if (!notifying.get())
  notifying.set(Boolean.TRUE);
  fireChannelInterestChanged(AbstractNioChannel.this);
  notifying.set(Boolean.FALSE);
  
  
  
  return true;
  
  fireChannelInterestChanged这个会调到SimpleChannelUpstreamHandler.handleUpstream,触发SimpleChannelUpstreamHandler.channelInterestChanged,可以通过继承这个方法来自定义做些事情。高水位的值可以通过Bootstrap设置,最终会调到DefaultNioSocketChannelConfig.setOption。writeBufferHighWaterMark默认值为64K
  Java代码
  public boolean setOption(String key, Object value)
  if (super.setOption(key, value))
  return true;
  
  if ("writeBufferHighWaterMark".equals(key))
  setWriteBufferHighWaterMark0(ConversionUtil.toInt(value));
   else if ("writeBufferLowWaterMark".equals(key))
  setWriteBufferLowWaterMark0(ConversionUtil.toInt(value));
   else if ("writeSpinCount".equals(key))
  setWriteSpinCount(ConversionUtil.toInt(value));
   else if ("receiveBufferSizePredictorFactory".equals(key))
  setReceiveBufferSizePredictorFactory((ReceiveBufferSizePredictorFactory) value);
   else if ("receiveBufferSizePredictor".equals(key))
  setReceiveBufferSizePredictor((ReceiveBufferSizePredictor) value);
   else
  return false;
  
  return true;
  
  然后在write0的时候会从队列拉数据,拉数据的时候,如果发现本次拉的数据会导致缓存的数据大小(字节)从低水位writeBufferLowWaterMark之上,掉到了低水位之下,即跨过了低水位,会再次触发fireChannelInterestChanged事件。writeBufferLowWaterMark默认值为32K
  Java代码
  public MessageEvent poll()
  MessageEvent e = queue.poll();
  if (e != null)
  int messageSize = getMessageSize(e);
  int newWriteBufferSize = writeBufferSize.addAndGet(-messageSize);
  int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
  
  
  if (newWriteBufferSize == 0 || newWriteBufferSize < lowWaterMark)
  if (newWriteBufferSize + messageSize >= lowWaterMark) //本次拉取,是的缓存数据大小掉到了低水位之下
  highWaterMarkCounter.decrementAndGet();
  if (isConnected() && !notifying.get())
  notifying.set(Boolean.TRUE);
  fireChannelInterestChanged(AbstractNioChannel.this);
  notifying.set(Boolean.FALSE);
  
  
  
  
  return e;
  
  超过高水位和低于低水位都会触发fireChannelInterestChanged,怎么区分呢?通过AbstractChannel. isWritable(),如果channel的interestOps里边有注册过OP_WRITE,则是不可写的,否则是可写的
  Java代码
  public boolean isWritable()
  return (getInterestOps() & OP_WRITE) == 0;
  
  public int getInterestOps()
  if (!isOpen())
  return Channel.OP_WRITE;
  
  
  int interestOps = getRawInterestOps();
  int writeBufferSize = this.writeBufferSize.get();
  if (writeBufferSize != 0)
  if (highWaterMarkCounter.get() > 0) //还记得这个值,放数据到发送队列的时候值+=1,从队列拉数据出来的时候值-=1
  int lowWaterMark = getConfig().getWriteBufferLowWaterMark();
  if (writeBufferSize >= lowWaterMark) //缓存队列数据量,超过高水位,也超过了低水位,意味着高水位>低水位,此时等于注册写操作
  interestOps |= Channel.OP_WRITE;
   else
  interestOps &= ~Channel.OP_WRITE;//缓存队列数据量,超过高水位但是低于低水位,意味着低水位>高水位,此时等于没有注册写操作
  
   else //超过高水位counter<=0,意味着当前数据量小于高水位
  int highWaterMark = getConfig().getWriteBufferHighWaterMark();
  if (writeBufferSize >= highWaterMark) //这里,缓存数据量仍然高于高水位.....并发?按道理说channel的处理是单线程处理的,此时等于注册写操作
  interestOps |= Channel.OP_WRITE;
   else
  interestOps &= ~Channel.OP_WRITE;
  
  
   else
  interestOps &= ~Channel.OP_WRITE;//写队列没数据,没有注册写操作
  
  
  return interestOps;
  
  即,如果超过高水位isWritable()==false,低于低水位isWritable()==true,低水位优先级高于高水位,即如果 当前水位>低水位 则不可写,否则可写

  如果在通过netty向某机器写数据,但是写很缓慢,则会导致数据都缓存到netty的发送队列中,如果不做控制,可能会导致full gc/cms gc频繁,甚至最终OOM。所以可以考虑用高水位和低水位的值来控制netty的缓存队列,即用AbstractChannel.isWritable来控制是否继续写,如果AbstractChannel.isWritable==false,则丢弃数据,或者记录发送数据的状态,待后续缓存数据队列水位下降到安全水位后再发送。

以上是关于netty3.6.2中写数据的过程,以及写数据写不出去后怎么处理的主要内容,如果未能解决你的问题,请参考以下文章

把字符串的增删查改,插入以及删除各种操作封装为一个用c代码写的库,代码怎么写,我写不来求教

请教关于oracle中写存储过程时 select into 语句报错的问题

HBase写数据的异常问题以及优化

FIFO写满之后继续写数据,新数据会覆盖原来的数据吗,还是说新数据根本写不进去直接溢出?

python定义类()中写object和不写的区别

python定义类()中写object和不写的区别