mina解决粘包,找不到解码器,数据帧重传的问题

Posted android狗儿

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mina解决粘包,找不到解码器,数据帧重传的问题相关的知识,希望对你有一定的参考价值。

好久没写这个mina了,为了对之前的一篇博文Mina传输大数组,多路解码,粘包问题的处理  进行更进一步的补充,特此再来补说明。特别解决三个问题:

1,大数组粘包     在上篇的博文中提到用累积性解码器解决传输大数组的问题,还有可能出现粘包,解决方法是对decode方法进行了改进:

    public MessageDecoderResult decode(iosession session, IoBuffer in,
                                       ProtocolDecoderOutput out) throws Exception {

        Context ctx =getContext(session);//获取session  的context

        long matchCount=ctx.getMatchLength();//目前已获取的数据
        long length=ctx.getLength();//数据总长度
        IoBuffer buffer=ctx.getBuffer();//数据存入buffer

        //第一次取数据
        if(length==0){
            length=in.getLong();
            //保存第一次获取的长度
            ctx.setLength(length);
            matchCount=in.remaining();
        }
        else{
            matchCount+=in.remaining();
        }
        ctx.setMatchLength(matchCount);
        if (in.hasRemaining()) {// 如果buff中还有数
///改进的部分//
            if(matchCount< length) {
                buffer.put(in);// 添加到保存数据的buffer中
            }
            if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码
                final byte[] b = new byte[(int) length];
                byte[] temp = new byte[(int) length];
                in.get(temp,0, (int) (length-buffer.position()));//最后一次in的数据可能有多的
                buffer.put(temp);
/
                // 一定要添加以下这一段,否则不会有任何数据,因为,在执行in.put(buffer)时buffer的起始位置已经移动到最后,所有需要将buffer的起始位置移动到最开始
                buffer.flip();
                buffer.get(b);
                <span style="font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px;">    </span><span class="comment" style="margin: 0px; padding: 0px; border: none; color: rgb(0, 130, 0); font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px;">自己解码的部分///</span><span style="margin: 0px; padding: 0px; border: none; font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px;"> </span>
       


                ctx.reset();//清空
                return MessageDecoderResult.OK;

            } else {
                ctx.setBuffer(buffer);
                return MessageDecoderResult.NEED_DATA;
            }
        }
        return MessageDecoderResult.NEED_DATA;
    }
2,如果上传的数据有两种类型,也就是说有两个大数组,两个数组不停地轮流向通道中发数据,处理过程中也出现了不少问题。

   首先,毋庸置疑肯定要用累积性解码器,在通信网速良好,信息无误的情况下,一切是美好的;

   但是,在实际中,项目的要求是要针对两种数据的看重程度不一样的,将两种数组分别说成是数组A,数组B,看重程度如下:

           数组A必须准确无误的收到。若成功收到回复一个成功应答标志,否则发一个失败标志;同时,服务器若收到成功标志发下一个数据组B,若收到失败标志则将当前数组再发一次;

           而数组B,不管是否有没有成功收到,都回复一种标志,服务器收到该标志后直接发数组A;

   在这种模式下,难点就在于数组B 的接收有丢失,这时还停留在B的解码器中,而这是服务器直接发来了数组A;在保证数据不丢失的情况下,如何实现数据的跳转?

   具体实现是借助全局变量和通道空闲来实现,直接贴代码说明:

数组A的传输解码器:

public class ADecoder implements MessageDecoder {

    private final AttributeKey CONTEXT = new AttributeKey(getClass(),
            "context");
    private int k;
    private ReceiveRight mReceiveRight = new ReceiveRight();
    private ReceiveWrong mReceiveWrong = new ReceiveWrong();

    @Override
    public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
        if(Constants.flag ) {//如果在B解码器中,收到了一部分A数据,该部分数据有区分解码器的标志
            Constants.buffer.flip();
            Constants.buffer.limit(Constants.positionValue);
            byte headtail = Constants.buffer.get();
            byte functionCode = Constants.buffer.get();
            if (functionCode == 0x51 || functionCode == 0x53) {
                Constants.Isstop=false;
                return MessageDecoderResult.OK;

            } else {
                Constants.Isstop=true;
                return MessageDecoderResult.NOT_OK;
            }
        }else {
            if (in.remaining() < 2) {
                return MessageDecoderResult.NEED_DATA;
            } else {
                            
                byte functionCode = in.get();
                if (functionCode == 0x51 || functionCode == 0x53) {
                    Constants.Isstop=false;
                    return MessageDecoderResult.OK;
                } else {
                    Constants.Isstop=true;
                    return MessageDecoderResult.NOT_OK;
                }
            }
        }
    }

    @Override
    public MessageDecoderResult decode(IoSession session, IoBuffer in,
                                       final ProtocolDecoderOutput out) throws Exception {
        Constants.ctx = getContext(session);//获取session  的context
        long matchCount = Constants.ctx.getMatchLength();//目前已获取的数据
        long length = Constants.ctx.getLength();//数据总长度
        IoBuffer buffer = Constants.ctx.getBuffer();//数据存入buffer

        if (Constants.flag) {//<span style="font-family: Arial, Helvetica, sans-serif;">如果在B解码器中,收到了一部分A数据,要先将这部分数据存入累积性解码器的buffer中,以拼凑完整</span>

            Constants.buffer.flip();
            Constants.buffer.limit(Constants.positionValue);
            buffer.put(Constants.buffer);
            matchCount=Constants.positionValue;
            Constants.buffer.clear();
            Constants.flag=false;
            Constants.positionValue=0;
        }

///
        matchCount += in.remaining();
        Log.d("abcd", "共收到字节:" + String.valueOf(matchCount));
        Constants.ctx.setMatchLength(matchCount);

        if (in.hasRemaining()) {// 如果in中还有数据
            if(matchCount< length) {
                buffer.put(in);// 添加到保存数据的buffer中
            }
            if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码
                final byte[] b = new byte[1614];
                byte[] temp = new byte[1614];
                in.get(temp,0, (int) (length-buffer.position()));//最后一次in的数据可能有多的
                buffer.put(temp);
                // 一定要添加以下这一段,否则不会有任何数据,因为,在执行in.put(buffer)时buffer的起始位置已经移动到最后,所有需要将buffer的起始位置移动到最开始
                buffer.flip();
                buffer.get(b);

               //解码部分
                        Constants.NotFill = false;//收成功,NotFill表示没满的变量
                        k++;
                        Log.d("sucess", "成功次数:" + String.valueOf(k));
              
              
                }
                Constants.ctx.reset();
                return MessageDecoderResult.OK;
            } else {
                Constants.ctx.setBuffer(buffer);
                Constants.NotFill = true;
                return MessageDecoderResult.NEED_DATA;
            }
        }
        return MessageDecoderResult.NEED_DATA;
    }

    @Override
    public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {

    }

数组B的解码器

public class BDecoder implements MessageDecoder {

    private final AttributeKey CONTEXT = new AttributeKey(getClass(),
            "context");
    private ComputePara computePara=new ComputePara();
    private ReceiveRight mReceiveRight = new ReceiveRight();
    private ReceiveWrong mReceiveWrong = new ReceiveWrong();


    @Override
    public MessageDecoderResult decodable(IoSession session, IoBuffer in) {

        if(Constants.flag ){//区分解码器的标志出现在上一次剩余数据中
            Constants.buffer.limit(Constants.positionValue);
            Constants.buffer.flip();
            byte functionCode = Constants.buffer.get();
        
            if (functionCode == 0x52) {
                Constants.Isstop=false;
                return MessageDecoderResult.OK;
            } else {
                Constants.Isstop=true;
                return MessageDecoderResult.NOT_OK;
            }
        }else {
            if (in.remaining() < 2) {
                return MessageDecoderResult.NEED_DATA;
            } else {
             
             
                byte functionCode = in.get();
           

                if (functionCode == 0x52) {
                    Constants.Isstop=false;
                    return MessageDecoderResult.OK;

                } else {
                    Constants.Isstop=true;
                    return MessageDecoderResult.NOT_OK;
                }
            }
        }
    }
    @Override
    public MessageDecoderResult decode(IoSession session, IoBuffer in,
                                       final ProtocolDecoderOutput out) throws Exception {

        if(Constants.IsJump&&(!Constants.Backfail)){
            Constants.IsJump=false;
            Constants.flag = true;
            Constants.positionValue=in.limit();
            Constants.buffer.clear();
            Constants.buffer.put(in);
            Log.d("back", "jump");
            return MessageDecoderResult.OK;
        }

        Constants.ctxBack = getContext(session);//获取session  的context
        long matchCount = Constants.ctxBack .getMatchLength();//目前已获取的数据
        long length = Constants.ctxBack .getLength();//数据总长度
        IoBuffer buffer = Constants.ctxBack .getBuffer();//数据存入buffer

///
        matchCount += in.remaining();
        Log.d("back", "共收到字节:" + String.valueOf(matchCount));
        Constants.ctxBack .setMatchLength(matchCount);
        if (in.hasRemaining()) {// 如果in中还有数据
            if(matchCount< length) {
                buffer.put(in);// 添加到保存数据的buffer中
            }
            if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码
                final byte[] b = new byte[1561];
                byte[] temp = new byte[1561];
                in.get(temp,0, (int) (length-buffer.position()));//最后一次in的数据可能有多的
                buffer.put(temp);
                // 一定要添加以下这一段,否则不会有任何数据,因为,在执行in.put(buffer)时buffer的起始位置已经移动到最后,所有需要将buffer的起始位置移动到最开始
                buffer.flip();
                buffer.get(b);
                 //解码过程
                
                Constants.ctxBack .reset();
                return MessageDecoderResult.OK;
            } else {
                Constants.ctxBack .setBuffer(buffer);
                Constants.Backfail=true;
                return MessageDecoderResult.NEED_DATA;
            }
        }
        return MessageDecoderResult.OK;
    }

    @Override
    public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {

    }
/**
<pre name="code" class="java">public static boolean NotFill=false;//A数据没有收满
public static boolean Backfail=false;//B数据接收失败
public static boolean IsJump=false;/B谱数据接收失败h后让解码器跳转到A频谱
public static int positionValue=0;   
public static boolean flag=false;
public static boolean Isstop=false;//判断是否有解码器


 
*/
<pre name="code" class="java">@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
    super.sessionIdle(session, status);
    final ReceiveWrong mReceiveWrong = new ReceiveWrong();
    final ReceiveRight mReceiveRight = new ReceiveRight();
    //A数据超时重传
    if (Constants.NotFill) {
        Constants.FPGAsession.write(mReceiveWrong);
        Constants.NotFill = false;
        Constants.ctx.reset();
        Constants.failCount++;
        Log.d("trans", "重传次数:" + Constants.failCount);
    }
//B数据传输失败
    if (Constants.Backfail) {
        Constants.FPGAsession.write(mReceiveWrong);
        Constants.Backfail = false;
        Constants.IsJump = true;
        Constants.ctxBack.reset();
    }
    if (Constants.Isstop) {
        Constants.FPGAsession.write(mReceiveWrong);
        Constants.Isstop = false;

    }
}
</pre><pre code_snippet_id="1691756" snippet_file_name="blog_20160521_13_6359721" name="code" class="java">在处理上述问题的时候,又相继出现了一个问题,那就是找不到解码器,导致IObuffer中出现了大量垃圾数据,解码器的不断循环,于是专门写了一个用来清除垃圾书据的解码器,
<pre name="code" class="java">public class ClearDecoder implements MessageDecoder {
    @Override
    public MessageDecoderResult decodable(IoSession ioSession, IoBuffer ioBuffer) {
        if(Constants.Isstop) {//<span style="font-family: Arial, Helvetica, sans-serif;">Isstop=false;//判断是否有解码器,该标志时在找不到解码器时置为true</span>

            ioBuffer.sweep();
           
           int n=ioBuffer.remaining();
            Constants.buffer.sweep();//buffer中的是错误帧,此时也找不到解码器
            Constants.flag=false;
            Log.d("abcd", "clear解码器清空数据");
        }
        return MessageDecoderResult.OK;
    }

    @Override
    public MessageDecoderResult decode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
        return null;
    }

    @Override
    public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {

    }
}


 
</pre><pre code_snippet_id="1691756" snippet_file_name="blog_20160521_16_3786210" name="code" class="java">

                

以上是关于mina解决粘包,找不到解码器,数据帧重传的问题的主要内容,如果未能解决你的问题,请参考以下文章

mina框架tcpt通讯接收数据断包粘包处理

mina websocket 粘包断包(丢包)解决心得

互联网协议 — TCP — 拥塞控制(网络质量保障)

TCP 的超时与重传

mina中的Tcp粘包,半包的问题

解决粘包和拆包问题