mina之自定义编码和自定义解码
Posted 健康平安的活着
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mina之自定义编码和自定义解码相关的知识,希望对你有一定的参考价值。
一 协议
1.1 协议
比如QQ聊天工具,当输入完一个消息后,点击发送按钮向对方发送时,此时系统就会在在你的消息后添加一个文本换行符,接收方看到这个文本换行符就认为这是一个完整的消息,解析成字符串显示出来。而这个规则,就称之为协议!
1.2 几种定长的协议法
定长消息法:这种方式是使用长度固定的数据发送,一般适用于指令发送。譬如:数据发送端规定发送的数据都是双字节,AA 表示启动、BB 表示关闭等等。
字符定界法:这种方式是使用特殊字符作为数据的结束符,一般适用于简单数据的发送。譬如:在消息的结尾自动加上文本换行符(Windows使用\\r\\n,Linux使用\\n),接收方见到文本换行符就认为是一个完整的消息,结束接收数据开始解析。注意:这个标识结束的特殊字符一定要简单,常常使用ASCII码中的特殊字符来标识(会出现粘包、半包情况)。
定长报文头法:使用定长报文头,在报文头的某个域指明报文长度。该方法最灵活,使用最广。譬如:协议为– 协议编号(1字节)+数据长度(4个字节)+真实数据。请求到达后,解析协议编号和数据长度,根据数据长度来判断后面的真实数据是否接收完整。HTTP 协议的消息报头中的Content-Length 也是表示消息正文的长度,这样数据的接收端就知道到底读到多长的字节数就不用再读取数据了。
二 案例操作
本案例使用的就是:定长报文头法,也是实际中使用的最多的协议方法
协议格式:
- 包头:数据包的版本号,以及整个数据包的长度(包头+包体)
- 包体:实际数据
2.1 定义实体bean
数据长度(4个字节) + 协议编号(1字节)+ 真实数据。
package com.ljf.mina.demo.tcp.endcoder;
/**
* @ClassName: CustomPack
* @Description: TODO
* @Author: liujianfu
* @Date: 2021/08/03 16:38:13
* @Version: V1.0
**/
public class CustomPack {
/**
* 0x00表示请求
*/
public static final byte REQUEST = 0x00;
/**
* 0x01表示回复
*/
public static final byte RESPONSE = 0x01;
// 总长度(编号字节 + 长度的字节 + 包体长度字节)
private int len;
// 版本号
private byte flag;
// 发送人,只是服务端-客户端,暂时无需发送人 接收人
// private long sender;
// 接收人
// private long receiver;
// 包体
private String content;
// 构造方法设置协议
public CustomPack(byte flag, String content) {
this.flag = flag;
this.content = content;
// 版本类型的长度1个字节, len的长度4个字节, 内容的字节数
this.len = 1 + 4 + (content == null ? 0 : content.getBytes().length);
}
public int getLen() {
return len;
}
public void setLen(int len) {
this.len = len;
}
public byte getFlag() {
return flag;
}
public void setFlag(byte flag) {
this.flag = flag;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return "CustomPack{" +
"len=" + len +
", flag=" + flag +
", content='" + content + '\\'' +
'}';
}
}
2.2 客户端编码器
目标:将JAVA对象转换成二进制流
实现:继承ProtocolEncoderAdapter类或实现ProtocolEncoder接口
package com.ljf.mina.demo.tcp.endcoder;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.iosession;
import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
import java.nio.charset.Charset;
/**
* @ClassName: CustomProtocolEncoder
* @Description: TODO 自定义编码器 将JAVA对象转换成二进制流
* @Author: liujianfu
* @Date: 2021/08/03 16:41:29
* @Version: V1.0
**/
public class CustomProtocolEncoder implements ProtocolEncoder {
private final Charset charset;
public CustomProtocolEncoder() {
this.charset = Charset.defaultCharset();
}
// 构造方法注入编码格式
public CustomProtocolEncoder(Charset charset) {
this.charset = charset;
}
@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
// 转为自定义协议包
CustomPack customPack = (CustomPack) message;
// 初始化缓冲区
IoBuffer buffer = IoBuffer.allocate(customPack.getLen()) //超过了allocate()方法中设置的值,就开始扩充
.setAutoExpand(true); //自动扩充
// 设置长度、版本、内容
buffer.putInt(customPack.getLen());
buffer.put(customPack.getFlag());
if (customPack.getContent() != null) {
buffer.put(customPack.getContent().getBytes());
}
System.out.println("!!!!!!!!!!!!我是编码器========");
// 重置mask,发送buffer
buffer.flip();
out.write(buffer);
}
@Override
public void dispose(IoSession session) throws Exception {
}
}
2.3 服务端解码器
目标:将二进制流转换成JAVA对象
实现:实现ProtocolDecoder接口或继承ProtocolDecoderAdapter类(难以解决半包、粘包问题) 继承CumulativeProtocolDecoder类,重写doDecode方法(推荐使用此方法,完美解决半包、粘包问题)
package com.ljf.mina.demo.tcp.endcoder;
import com.ljf.mina.demo.tcp.socket.MinaServerCustom;
import org.apache.log4j.Logger;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
/**
* @ClassName: CustomProtocolDecoder
* @Description: TODO
* @Author: liujianfu
* @Date: 2021/08/03 16:50:23
* @Version: V1.0
**/
public class CustomProtocolDecoder extends CumulativeProtocolDecoder {
private static Logger logger = Logger.getLogger(CustomProtocolDecoder.class);
private final Charset charset;
public CustomProtocolDecoder() {
this.charset = Charset.defaultCharset();
}
// 构造方法注入编码格式
public CustomProtocolDecoder(Charset charset) {
this.charset = charset;
}
/**
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
logger.info("进入服务端的解码器中.....");
in.mark();
// final int PACK_HEAD_LEN = 1;
// // 获取总长度
// int length = in.getInt(in.position());
// System.out.println(">>>>>>>>>>>length:"+length);
// logger.info("服务端获取数据长度....."+length);
in.reset();
byte[] bytes = new byte[5];
// 获取长度4个字节、版本1个字节、内容
in.get(bytes, 0, 5);
// byte flag = bytes[4];
byte[] tbytes = Arrays.copyOfRange(bytes,0,1);
String dataType=new String(tbytes);
System.out.println("类型============"+dataType );
byte[] sbytes = Arrays.copyOfRange(bytes,1,3);
System.out.println("序号============"+ bytes2Short(sbytes)+" 序号:"+new String(sbytes));
byte[] cbytes = Arrays.copyOfRange(bytes,3,5);
System.out.println("总数============"+ bytes2Short(cbytes)+" 总数:"+new String(cbytes));
short count = bytes2Short(cbytes);
if("A".equals(dataType)){
for (int i = 0 ; i < count; i++){
byte[] tmps = new byte[4];
// 获取长度4个字节、版本1个字节、内容
in.get(tmps, 0, 4);
System.out.println("读取内容为模拟量数据包A:["+i+"]====================="+new String(tmps) +" || "+bytesToLong(tmps));
// System.out.println("内容["+i+"]====================="+bytes2Long(tmps));
}
}
if("B".equals(dataType)){
for (int i = 0 ; i < count; i++){
byte[] tmps = new byte[1];
// 获取长度4个字节、版本1个字节、内容
in.get(tmps, 0, 1);
String value = new String(tmps);
System.out.println("bytes to Char >>>>>>>>>"+value);
System.out.println("读取内容为数字量数据包B: ["+i+"]====================="+new String(tmps) +" || "+ String.valueOf(getChars(tmps)));
// System.out.println("内容["+i+"]====================="+bytes2Long(tmps));
}
}
// byte[] conbytes = Arrays.copyOfRange(bytes,9,length);
//String content = new String(bytes, PACK_HEAD_LEN, length - PACK_HEAD_LEN, charset);
//logger.info("服务端读取到的内容为:..."+content);
// 封装为自定义的java对象
//CustomPack pack = new CustomPack(flag, content);
// out.write(content);
// 如果读取一条记录后,还存在数据(粘包),则再次进行调用
return in.remaining() > 0;
}
**/
@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
logger.info("进入服务端的解码器中.....");
// 包头的长度
final int PACK_HEAD_LEN = 5;
// 拆包时,如果可读数据的长度小于包头的长度,就不进行读取
if (in.remaining() < PACK_HEAD_LEN) {
logger.info(" 拆包时,如果可读数据的长度小于包头的长度,就不进行读取....."+in.remaining());
return false;
}
if (in.remaining() > 1) {
logger.info(" 拆包时,如果可读数据的长度大于包的同步长度....."+in.remaining());
// 标记设为当前
in.mark();
// 获取总长度
int length = in.getInt(in.position());
// 如果可读取数据的长度 小于 总长度 - 包头的长度 ,则结束拆包,等待下一次
if (in.remaining() < (length - PACK_HEAD_LEN)) {
logger.info(" 如果可读取数据的长度 小于 总长度 - 包头的长度 ,则结束拆包,等待下一次....."+in.remaining());
in.reset();
return false;
} else {
logger.info(" 重置,并读取一条完整记录....."+in.remaining());
// 重置,并读取一条完整记录
in.reset();
System.out.println(">>>>>>>>>>>length:"+length);
logger.info(" 重置,并读取一条完整记录,数据长度....."+length);
byte[] bytes = new byte[length];
// 获取长度4个字节、版本1个字节、内容
in.get(bytes, 0, length);
byte flag = bytes[4];
String content = new String(bytes, PACK_HEAD_LEN, length - PACK_HEAD_LEN, charset);
// 封装为自定义的java对象
CustomPack pack = new CustomPack(flag, content);
out.write(pack);
// 如果读取一条记录后,还存在数据(粘包),则再次进行调用
return in.remaining() > 0;
}
}
return false;
}
private char bytes2Char(byte[] b){
ByteBuffer buffer = ByteBuffer.wrap(b);
buffer.order(ByteOrder.LITTLE_ENDIAN);
return buffer.getChar();
// return (short) (b[1] << 8 | (b[0] & 0xff));
}
private short bytes2Short(byte[] b){
ByteBuffer buffer = ByteBuffer.wrap(b);
buffer.order(ByteOrder.LITTLE_ENDIAN);
return buffer.getShort();
// return (short) (b[1] << 8 | (b[0] & 0xff));
}
private long bytes2Long(byte[] b){
// ByteBuffer buffer = ByteBuffer.wrap(b);
// buffer.order(ByteOrder.LITTLE_ENDIAN);
// return buffer.getLong();
return ((((long) b[0] & 0xff) << 0)
| (((long) b[1] & 0xff) << 8)
| (((long) b[2] & 0xff) << 16)
| (((long) b[3] & 0xff) << 24)
);
}
public static long bytesToLong(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.allocate(8);
buffer.put(bytes, 0, bytes.length);
buffer.flip();//need flip
return buffer.getLong();
}
private char[] getChars (byte[] bytes) {
Charset cs = Charset.forName ("UTF-8");
ByteBuffer bb = ByteBuffer.allocate (bytes.length);
bb.put (bytes);
bb.flip ();
CharBuffer cb = cs.decode (bb);
return cb.array();
}
}
doDecode()方法的解释:
你的doDecode()方法返回true 时,CumulativeProtocolDecoder 的decode()方法会首先判断你是否在doDecode()方法中从内部的IoBuffer 缓冲区读取了数据,如果没有,则会抛出非法的状态异常,也就是你的doDecode()方法返回true 就表示你已经消费了本次数据(相当于聊天室中一个完整的消息已经读取完毕),进一步说,也就是此时你必须已经消费过内部的IoBuffer 缓冲区的数据(哪怕是消费了一个字节的数据)。如果验证过通过,那么CumulativeProtocolDecoder 会检查缓冲区内是否还有数据未读取,如果有就继续调用doDecode()方法,没有就停止对doDecode()方法的调用,直到有新的数据被缓冲。
当你的doDecode()方法返回false 时,CumulativeProtocolDecoder 会停止对doDecode()方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的IoBuffer 缓冲区保存到IoSession 中,以便下一次数据到来时可以从IoSession 中提取合并。如果发现本次数据全都读取完毕,则清空IoBuffer 缓冲区。
简而言之,当你认为读取到的数据已经够解码了,那么就返回true,否则就返回false。这个CumulativeProtocolDecoder 其实最重要的工作就是帮你完成了数据的累积,因为这个工作是很烦琐的。也就是说返回true,那么CumulativeProtocolDecoder会再次调用decoder,并把剩余的数据发下来返回false就不处理剩余的,当有新数据包来的时候把剩余的和新的拼接在一起然后再调用decoder。
2.4 自定义编解码工厂
目标:为filter中注入编解码工厂,通过工厂类获取编解码器
实现:实现ProtocolCodecFactory接口,获取编解码器
package com.ljf.mina.demo.tcp.endcoder;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
import java.nio.charset.Charset;
/**
* @ClassName: CustomProtocolCodecFactory
* @Description: TODO
* @Author: liujianfu
* @Date: 2021/08/03 16:57:20
* @Version: V1.0
**/
public class CustomProtocolCodecFactory implements ProtocolCodecFactory {
private final ProtocolEncoder encoder;
private final ProtocolDecoder decoder;
public CustomProtocolCodecFactory() {
this(Charset.forName("UTF-8"));
}
// 构造方法注入编解码器
public CustomProtocolCodecFactory(Charset charset) {
this.encoder = new CustomProtocolEncoder(charset);
this.decoder = new CustomProtocolDecoder(charset);
}
public ProtocolEncoder getEncoder(IoSession session) throws Exception {
return encoder;
}
public ProtocolDecoder getDecoder(IoSession session) throws Exception {
return decoder;
}
}
2.5 客户端的handler
package com.ljf.mina.demo.tcp.handler;
import com.ljf.mina.demo.socket.MinaServer;
import com.ljf.mina.demo.tcp.endcoder.CustomPack;
import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
/**
* @ClassName: MyClientHandler
* @Description: TODO
* @Author: liujianfu
* @Date: 2021/08/03 16:30:33
* @Version: V1.0
**/
public class MyClientHandler extends IoHandlerAdapter {
private static Logger logger = Logger.getLogger(MyClientHandler.class);
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
CustomPack pack = (CustomPack) message;
logger.info("客户端接收消息成功:" + pack);
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
CustomPack pack = (CustomPack) message;
logger.info("客户端发送消息成功:" + pack);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
cause.printStackTrace();
logger.error("客户端处理消息异常:" + cause.getMessage());
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
// 空闲时,关闭session
if (status == IdleStatus.BOTH_IDLE) {
logger.info("session进入空闲,准备关闭session");
session.closeNow();
}
}
}
2.6 服务端的handler
package com.ljf.mina.demo.tcp.handler;
import com.ljf.mina.demo.tcp.endcoder.CustomPack;
import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import java.util.Date;
/**
* @ClassName: MyServerHandler
* @Description: TODO
* @Author: liujianfu
* @Date: 2021/08/03 17:01:54
* @Version: V1.0
**/
public class MyServerHandler extends IoHandlerAdapter {
private static Logger logger = Logger.getLogger(MyServerHandler.class);
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
logger.info("服务端接收消息成功message:【" + message+" 】");
CustomPack pack = (CustomPack) message;
logger.info("服务端接收消息成功:【" + pack+" 】");
CustomPack sendClientMessage = (CustomPack) message;
String s= "服务端收到你客户端发来的信息,感谢!"+new Date();
// String s= "thank you!"+new Date();
sendClientMessage.setLen(5+s.getBytes().length);
sendClientMessage.setFlag((byte)1);
sendClientMessage.setContent(s);
session.write(sendClientMessage);
}
@Override
public void messageSent(IoSession session, Object message) throws Exception {
CustomPack pack = (CustomPack) message;
logger.info("服务端发送消息成功:" + pack);
}
@Override
public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
cause.printStackTrace();
logger.error("服务端处理消息异常:" + cause);
}
}
2.6 客户端
package com.ljf.mina.demo.tcp.socket;
import com.ljf.mina.demo.socket.MinaServer;
import com.ljf.mina.demo.tcp.endcoder.CustomPack;
import com.ljf.mina.demo.tcp.endcoder.CustomProtocolCodecFactory;
import com.ljf.mina.demo.tcp.handler.MyClientHandler;
import org.apache.log4j.Logger;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.NioDatagramConnector;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
/**
* @ClassName: MinaClientCustom
* @Description: TODO
* @Author: liujianfu
* @Date: 2021/08/03 16:15:56
* @Version: V1.0
**/
public class MinaClientCustom {
private static final String MINA_HOST = "127.0.0.1";
private static final int MINA_PORT = 7080;
private static Logger logger = Logger.getLogger(MinaServer.class);
public static void main(String[] args) {
dealData();
}
public static void dealData(){
// 获取当前系统时间戳
long start = System.currentTimeMillis();
// 创建一个非阻塞的客户端
// IoConnector connector = new NioSocketConnector();
IoConnector connector=new NioDatagramConnector();
// 设置编码过滤器
connector.getFilterChain().addLast("mycoder", new ProtocolCodecFilter(new CustomProtocolCodecFactory(Charset.forName("UTF-8"))));
// 设置缓冲区大小
connector.getSessionConfig().setReadBufferSize(1024);
// 设置空闲时间
connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
// 绑定逻辑处理Handler
connector.setHandler(new MyClientHandler());
// 创建连接
ConnectFuture future = connector.connect(new InetSocketAddress(MINA_HOST, MINA_PORT));
// 这里采用监听方式获取session
future.addListener(new IoFutureListener<IoFuture>() {
// 当连接创建完成
public void operationComplete(IoFuture future) {
IoSession session = future.getSession();
sendData(session);
}
});
}
// 发送数据的方法
public static void sendData(IoSession session) {
logger.info("----------------------------测试数据准备发送-----------------------------");
// 模拟发送100次数据
for (int i = 0; i < 1; i++) {
// String content = "Mina是一个基于NIO的网络框架,使用它编写程序时,可以专注于业务处理,而不用过于关心IO操作。不论应用程序采用什么协议(TCP、UDP)或者其它的,Mina提供了一套公用的接口:" + i;
String content = "Mina是一个基于NIO的网络框架,使用它编写程序时,可以专注于业务处理,而不用过于关心IO操作。不论应用程序采用什么协议(TCP、UDP)或者其它的,Mina提供了一套公用的接口:" + i;
CustomPack pack = new CustomPack((byte) i, content);
session.write(pack);
logger.info("----------------------------测试数据发送完毕"+i);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info("----------------------------测试数据发送完毕-----------------------------");
}
}
2.7 服务端
package com.ljf.mina.demo.tcp.socket;
import com.ljf.mina.demo.socket.MinaServer;
import com.ljf.mina.demo.tcp.endcoder.CustomProtocolCodecFactory;
import com.ljf.mina.demo.tcp.handler.MyServerHandler;
import org.apache.log4j.Logger;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;
/**
* @ClassName: MinaServerCustom
* @Description: TODO
* @Author: liujianfu
* @Date: 2021/08/03 17:04:28
* @Version: V1.0
**/
public class MinaServerCustom {
// 端口
private static final int MINA_PORT = 7080;
private static Logger logger = Logger.getLogger(MinaServerCustom.class);
public static void main(String[] args) {
IoAcceptor acceptor;
try {
// 创建一个非阻塞的服务端server
//acceptor = new NioSocketAcceptor();
acceptor = new NioDatagramAcceptor();
// 设置编码过滤器(自定义)
acceptor.getFilterChain().addLast("mycoder", new ProtocolCodecFilter(new CustomProtocolCodecFactory(Charset.forName("UTF-8"))));
// 设置缓冲区大小
acceptor.getSessionConfig().setReadBufferSize(1024);
// 设置读写空闲时间
acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
// 绑定handler
acceptor.setHandler(new MyServerHandler());
// 绑定端口
acceptor.bind(new InetSocketAddress(MINA_PORT));
logger.info("创建Mina服务端成功,端口:" + MINA_PORT);
} catch (IOException e) {
logger.error("创建Mina服务端出错:" + e.getMessage());
}
}
}
2.8 测试
服务端:
客户端:
以上是关于mina之自定义编码和自定义解码的主要内容,如果未能解决你的问题,请参考以下文章
带有复选框和自定义适配器的 ListView,片段无法正常工作