mina解决粘包,找不到解码器,数据帧重传的问题
Posted android狗儿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mina解决粘包,找不到解码器,数据帧重传的问题相关的知识,希望对你有一定的参考价值。
好久没写这个mina了,为了对之前的一篇博文Mina传输大数组,多路解码,粘包问题的处理 进行更进一步的补充,特此再来补说明。特别解决三个问题:
1,大数组粘包 在上篇的博文中提到用累积性解码器解决传输大数组的问题,还有可能出现粘包,解决方法是对decode方法进行了改进:
public MessageDecoderResult decode(iosession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
Context ctx =getContext(session);//获取session 的context
long matchCount=ctx.getMatchLength();//目前已获取的数据
long length=ctx.getLength();//数据总长度
IoBuffer buffer=ctx.getBuffer();//数据存入buffer
//第一次取数据
if(length==0){
length=in.getLong();
//保存第一次获取的长度
ctx.setLength(length);
matchCount=in.remaining();
}
else{
matchCount+=in.remaining();
}
ctx.setMatchLength(matchCount);
if (in.hasRemaining()) {// 如果buff中还有数
///改进的部分//
if(matchCount< length) {
buffer.put(in);// 添加到保存数据的buffer中
}
if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码
final byte[] b = new byte[(int) length];
byte[] temp = new byte[(int) length];
in.get(temp,0, (int) (length-buffer.position()));//最后一次in的数据可能有多的
buffer.put(temp);
/
// 一定要添加以下这一段,否则不会有任何数据,因为,在执行in.put(buffer)时buffer的起始位置已经移动到最后,所有需要将buffer的起始位置移动到最开始
buffer.flip();
buffer.get(b);
<span style="font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px;"> </span><span class="comment" style="margin: 0px; padding: 0px; border: none; color: rgb(0, 130, 0); font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px;">自己解码的部分///</span><span style="margin: 0px; padding: 0px; border: none; font-family: Consolas, 'Courier New', Courier, mono, serif; line-height: 18px;"> </span>
ctx.reset();//清空
return MessageDecoderResult.OK;
} else {
ctx.setBuffer(buffer);
return MessageDecoderResult.NEED_DATA;
}
}
return MessageDecoderResult.NEED_DATA;
}
2,如果上传的数据有两种类型,也就是说有两个大数组,两个数组不停地轮流向通道中发数据,处理过程中也出现了不少问题。
首先,毋庸置疑肯定要用累积性解码器,在通信网速良好,信息无误的情况下,一切是美好的;
但是,在实际中,项目的要求是要针对两种数据的看重程度不一样的,将两种数组分别说成是数组A,数组B,看重程度如下:
数组A必须准确无误的收到。若成功收到回复一个成功应答标志,否则发一个失败标志;同时,服务器若收到成功标志发下一个数据组B,若收到失败标志则将当前数组再发一次;
而数组B,不管是否有没有成功收到,都回复一种标志,服务器收到该标志后直接发数组A;
在这种模式下,难点就在于数组B 的接收有丢失,这时还停留在B的解码器中,而这是服务器直接发来了数组A;在保证数据不丢失的情况下,如何实现数据的跳转?
具体实现是借助全局变量和通道空闲来实现,直接贴代码说明:
数组A的传输解码器:
public class ADecoder implements MessageDecoder {
private final AttributeKey CONTEXT = new AttributeKey(getClass(),
"context");
private int k;
private ReceiveRight mReceiveRight = new ReceiveRight();
private ReceiveWrong mReceiveWrong = new ReceiveWrong();
@Override
public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
if(Constants.flag ) {//如果在B解码器中,收到了一部分A数据,该部分数据有区分解码器的标志
Constants.buffer.flip();
Constants.buffer.limit(Constants.positionValue);
byte headtail = Constants.buffer.get();
byte functionCode = Constants.buffer.get();
if (functionCode == 0x51 || functionCode == 0x53) {
Constants.Isstop=false;
return MessageDecoderResult.OK;
} else {
Constants.Isstop=true;
return MessageDecoderResult.NOT_OK;
}
}else {
if (in.remaining() < 2) {
return MessageDecoderResult.NEED_DATA;
} else {
byte functionCode = in.get();
if (functionCode == 0x51 || functionCode == 0x53) {
Constants.Isstop=false;
return MessageDecoderResult.OK;
} else {
Constants.Isstop=true;
return MessageDecoderResult.NOT_OK;
}
}
}
}
@Override
public MessageDecoderResult decode(IoSession session, IoBuffer in,
final ProtocolDecoderOutput out) throws Exception {
Constants.ctx = getContext(session);//获取session 的context
long matchCount = Constants.ctx.getMatchLength();//目前已获取的数据
long length = Constants.ctx.getLength();//数据总长度
IoBuffer buffer = Constants.ctx.getBuffer();//数据存入buffer
if (Constants.flag) {//<span style="font-family: Arial, Helvetica, sans-serif;">如果在B解码器中,收到了一部分A数据,要先将这部分数据存入累积性解码器的buffer中,以拼凑完整</span>
Constants.buffer.flip();
Constants.buffer.limit(Constants.positionValue);
buffer.put(Constants.buffer);
matchCount=Constants.positionValue;
Constants.buffer.clear();
Constants.flag=false;
Constants.positionValue=0;
}
///
matchCount += in.remaining();
Log.d("abcd", "共收到字节:" + String.valueOf(matchCount));
Constants.ctx.setMatchLength(matchCount);
if (in.hasRemaining()) {// 如果in中还有数据
if(matchCount< length) {
buffer.put(in);// 添加到保存数据的buffer中
}
if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码
final byte[] b = new byte[1614];
byte[] temp = new byte[1614];
in.get(temp,0, (int) (length-buffer.position()));//最后一次in的数据可能有多的
buffer.put(temp);
// 一定要添加以下这一段,否则不会有任何数据,因为,在执行in.put(buffer)时buffer的起始位置已经移动到最后,所有需要将buffer的起始位置移动到最开始
buffer.flip();
buffer.get(b);
//解码部分
Constants.NotFill = false;//收成功,NotFill表示没满的变量
k++;
Log.d("sucess", "成功次数:" + String.valueOf(k));
}
Constants.ctx.reset();
return MessageDecoderResult.OK;
} else {
Constants.ctx.setBuffer(buffer);
Constants.NotFill = true;
return MessageDecoderResult.NEED_DATA;
}
}
return MessageDecoderResult.NEED_DATA;
}
@Override
public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
}
数组B的解码器
public class BDecoder implements MessageDecoder {
private final AttributeKey CONTEXT = new AttributeKey(getClass(),
"context");
private ComputePara computePara=new ComputePara();
private ReceiveRight mReceiveRight = new ReceiveRight();
private ReceiveWrong mReceiveWrong = new ReceiveWrong();
@Override
public MessageDecoderResult decodable(IoSession session, IoBuffer in) {
if(Constants.flag ){//区分解码器的标志出现在上一次剩余数据中
Constants.buffer.limit(Constants.positionValue);
Constants.buffer.flip();
byte functionCode = Constants.buffer.get();
if (functionCode == 0x52) {
Constants.Isstop=false;
return MessageDecoderResult.OK;
} else {
Constants.Isstop=true;
return MessageDecoderResult.NOT_OK;
}
}else {
if (in.remaining() < 2) {
return MessageDecoderResult.NEED_DATA;
} else {
byte functionCode = in.get();
if (functionCode == 0x52) {
Constants.Isstop=false;
return MessageDecoderResult.OK;
} else {
Constants.Isstop=true;
return MessageDecoderResult.NOT_OK;
}
}
}
}
@Override
public MessageDecoderResult decode(IoSession session, IoBuffer in,
final ProtocolDecoderOutput out) throws Exception {
if(Constants.IsJump&&(!Constants.Backfail)){
Constants.IsJump=false;
Constants.flag = true;
Constants.positionValue=in.limit();
Constants.buffer.clear();
Constants.buffer.put(in);
Log.d("back", "jump");
return MessageDecoderResult.OK;
}
Constants.ctxBack = getContext(session);//获取session 的context
long matchCount = Constants.ctxBack .getMatchLength();//目前已获取的数据
long length = Constants.ctxBack .getLength();//数据总长度
IoBuffer buffer = Constants.ctxBack .getBuffer();//数据存入buffer
///
matchCount += in.remaining();
Log.d("back", "共收到字节:" + String.valueOf(matchCount));
Constants.ctxBack .setMatchLength(matchCount);
if (in.hasRemaining()) {// 如果in中还有数据
if(matchCount< length) {
buffer.put(in);// 添加到保存数据的buffer中
}
if (matchCount >= length) {// 如果已经发送的数据的长度>=目标数据的长度,则进行解码
final byte[] b = new byte[1561];
byte[] temp = new byte[1561];
in.get(temp,0, (int) (length-buffer.position()));//最后一次in的数据可能有多的
buffer.put(temp);
// 一定要添加以下这一段,否则不会有任何数据,因为,在执行in.put(buffer)时buffer的起始位置已经移动到最后,所有需要将buffer的起始位置移动到最开始
buffer.flip();
buffer.get(b);
//解码过程
Constants.ctxBack .reset();
return MessageDecoderResult.OK;
} else {
Constants.ctxBack .setBuffer(buffer);
Constants.Backfail=true;
return MessageDecoderResult.NEED_DATA;
}
}
return MessageDecoderResult.OK;
}
@Override
public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
}
/**
<pre name="code" class="java">public static boolean NotFill=false;//A数据没有收满
public static boolean Backfail=false;//B数据接收失败
public static boolean IsJump=false;/B谱数据接收失败h后让解码器跳转到A频谱
public static int positionValue=0;
public static boolean flag=false;
public static boolean Isstop=false;//判断是否有解码器
*/
<pre name="code" class="java">@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
super.sessionIdle(session, status);
final ReceiveWrong mReceiveWrong = new ReceiveWrong();
final ReceiveRight mReceiveRight = new ReceiveRight();
//A数据超时重传
if (Constants.NotFill) {
Constants.FPGAsession.write(mReceiveWrong);
Constants.NotFill = false;
Constants.ctx.reset();
Constants.failCount++;
Log.d("trans", "重传次数:" + Constants.failCount);
}
//B数据传输失败
if (Constants.Backfail) {
Constants.FPGAsession.write(mReceiveWrong);
Constants.Backfail = false;
Constants.IsJump = true;
Constants.ctxBack.reset();
}
if (Constants.Isstop) {
Constants.FPGAsession.write(mReceiveWrong);
Constants.Isstop = false;
}
}
</pre><pre code_snippet_id="1691756" snippet_file_name="blog_20160521_13_6359721" name="code" class="java">在处理上述问题的时候,又相继出现了一个问题,那就是找不到解码器,导致IObuffer中出现了大量垃圾数据,解码器的不断循环,于是专门写了一个用来清除垃圾书据的解码器,
<pre name="code" class="java">public class ClearDecoder implements MessageDecoder {
@Override
public MessageDecoderResult decodable(IoSession ioSession, IoBuffer ioBuffer) {
if(Constants.Isstop) {//<span style="font-family: Arial, Helvetica, sans-serif;">Isstop=false;//判断是否有解码器,该标志时在找不到解码器时置为true</span>
ioBuffer.sweep();
int n=ioBuffer.remaining();
Constants.buffer.sweep();//buffer中的是错误帧,此时也找不到解码器
Constants.flag=false;
Log.d("abcd", "clear解码器清空数据");
}
return MessageDecoderResult.OK;
}
@Override
public MessageDecoderResult decode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
return null;
}
@Override
public void finishDecode(IoSession ioSession, ProtocolDecoderOutput protocolDecoderOutput) throws Exception {
}
}
</pre><pre code_snippet_id="1691756" snippet_file_name="blog_20160521_16_3786210" name="code" class="java">
以上是关于mina解决粘包,找不到解码器,数据帧重传的问题的主要内容,如果未能解决你的问题,请参考以下文章