自定义协议传输
Posted 根须
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义协议传输相关的知识,希望对你有一定的参考价值。
step1:协议格式
step2:根据协议定义出对应的模型
1 package com.superb.mina.entity; 2 3 import java.io.ByteArrayOutputStream; 4 import java.io.DataOutputStream; 5 import java.io.IOException; 6 import java.io.Serializable; 7 import java.nio.charset.Charset; 8 9 /** 10 * 协议包 11 * @author sundg 12 * 13 * 2018年3月30日上午10:55:33 14 */ 15 public class BaseMsg implements Serializable { 16 /** 序列号*/ 17 private static final long serialVersionUID = -4614096987747485330L; 18 public static int HEAD_SIZE =18; 19 /**报文总长度*/ 20 private Integer length; 21 /**序列号*/ 22 private Integer sequence; 23 /**命令号*/ 24 private Integer command; 25 /**保留字*/ 26 private Integer reserved; 27 /**数据段*/ 28 private byte[] data; 29 30 private Integer separator=0xFFFF; 31 32 33 public static long getSerialversionuid() { 34 return serialVersionUID; 35 } 36 37 public void setLength(Integer length) { 38 this.length = length; 39 } 40 41 public Integer getLength() { 42 return length; 43 } 44 45 public Integer getSequence() { 46 return sequence; 47 } 48 49 public void setSequence(Integer sequence) { 50 this.sequence = sequence; 51 } 52 53 public Integer getCommand() { 54 return command; 55 } 56 57 public void setCommand(Integer command) { 58 this.command = command; 59 } 60 61 public Integer getReserved() { 62 return reserved; 63 } 64 65 public void setReserved(Integer reserved) { 66 this.reserved = reserved; 67 } 68 69 public byte[] getData() { 70 return data; 71 } 72 73 public void setData(byte[] data) { 74 this.data = data; 75 if (data != null) { 76 this.length = HEAD_SIZE + data.length; 77 } else { 78 this.length = HEAD_SIZE; 79 } 80 } 81 82 public Integer getSeparator() { 83 return separator; 84 } 85 86 public void setSeparator(Integer separator) { 87 this.separator = separator; 88 } 89 90 /** 91 * 将要发送的信息保存为二进制流进行传输 92 * @return 93 */ 94 public byte[] toBytes() { 95 byte[] ret = null; 96 try { 97 ByteArrayOutputStream bos = new ByteArrayOutputStream(this.length); 98 DataOutputStream dos = new DataOutputStream(bos); 99 100 dos.writeInt(this.length); 101 dos.writeInt(this.sequence); 102 dos.writeInt(this.command); 103 if(this.reserved==null){ 104 dos.writeInt(0); 105 }else{ 106 dos.writeInt(this.reserved); 107 } 108 if (this.data != null) { 109 dos.write(this.data); 110 } 111 dos.writeShort(0xFFFF); 112 ret = bos.toByteArray(); 113 } catch (IOException e) { 114 e.printStackTrace(); 115 } 116 117 return ret; 118 } 119 120 }
step3:自定义编码解码器
1 package com.superb.mina.coder; 2 3 import org.apache.mina.core.session.iosession; 4 import org.apache.mina.filter.codec.ProtocolCodecFactory; 5 import org.apache.mina.filter.codec.ProtocolDecoder; 6 import org.apache.mina.filter.codec.ProtocolEncoder; 7 8 public class SuperbProtocolCodecFactory implements ProtocolCodecFactory { 9 private ProtocolDecoder decoder; 10 private ProtocolEncoder encoder; 11 12 public SuperbProtocolCodecFactory() { 13 this.encoder = new SuperbProtocolEncoder(); 14 this.decoder = new SuperbProtocolDecoder(); 15 } 16 17 @Override 18 public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception { 19 return this.decoder; 20 } 21 22 @Override 23 public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception { 24 return this.encoder; 25 } 26 }
1 package com.superb.mina.coder; 2 3 import org.apache.mina.core.buffer.IoBuffer; 4 import org.apache.mina.core.session.IoSession; 5 import org.apache.mina.filter.codec.ProtocolEncoder; 6 import org.apache.mina.filter.codec.ProtocolEncoderOutput; 7 8 import com.superb.mina.entity.BaseMsg; 9 10 public class SuperbProtocolEncoder implements ProtocolEncoder { 11 12 @Override 13 public void dispose(IoSession session) throws Exception { 14 // nothing to dispose 15 } 16 17 @Override 18 public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { 19 BaseMsg msg = (BaseMsg) message; 20 byte[] bys = msg.toBytes(); 21 22 IoBuffer buffer = IoBuffer.allocate(bys.length, false); 23 buffer.put(bys); 24 buffer.flip(); 25 out.write(buffer); 26 } 27 }
1 package com.superb.mina.coder; 2 3 import org.apache.mina.core.buffer.BufferDataException; 4 import org.apache.mina.core.buffer.IoBuffer; 5 import org.apache.mina.core.session.IoSession; 6 import org.apache.mina.filter.codec.CumulativeProtocolDecoder; 7 import org.apache.mina.filter.codec.ProtocolDecoderOutput; 8 9 import com.superb.mina.entity.BaseMsg; 10 11 12 public class SuperbProtocolDecoder extends CumulativeProtocolDecoder { 13 private boolean prefixedDataAvailable(IoBuffer in) { 14 if (in.remaining() < 4) { 15 return false; 16 } 17 18 int dataLength = in.getInt(in.position()); 19 20 if (dataLength < BaseMsg.HEAD_SIZE) { 21 throw new BufferDataException("dataLength: " + dataLength); 22 } 23 24 return in.remaining() >= dataLength; 25 } 26 27 @Override 28 protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { 29 if (!prefixedDataAvailable(in)) { 30 return false; 31 } 32 33 BaseMsg msg = new BaseMsg(); 34 int length = in.getInt(); 35 msg.setLength(length); 36 msg.setSequence(in.getInt()); 37 msg.setCommand(in.getInt()); 38 msg.setReserved(in.getInt()); 39 40 int body_len = length - BaseMsg.HEAD_SIZE; 41 42 if (body_len < 0 ) { 43 throw new Exception("body length error.(" + body_len + ")"); 44 } 45 46 if (body_len > 0) { 47 byte[] body = new byte[body_len]; 48 in.get(body); 49 msg.setData(body); 50 } 51 in.getShort(); 52 msg.setSeparator(0xFFFF); 53 out.write(msg); 54 return true; 55 } 56 }
step4:编写对应的客户端与服务器端
1 package com.superb.mina.clientServer; 2 3 import java.net.InetSocketAddress; 4 import java.util.Date; 5 import java.util.HashMap; 6 import java.util.Map; 7 8 import org.apache.mina.core.future.ConnectFuture; 9 import org.apache.mina.core.service.IoConnector; 10 import org.apache.mina.core.session.IoSession; 11 import org.apache.mina.filter.codec.ProtocolCodecFilter; 12 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; 13 import org.apache.mina.transport.socket.nio.NioSocketConnector; 14 import org.slf4j.Logger; 15 import org.slf4j.LoggerFactory; 16 17 import com.superb.mina.clientServer.handler.ClientServerHandler; 18 import com.superb.mina.coder.MinaProtobufDecoder; 19 import com.superb.mina.coder.MinaProtobufEncoder; 20 import com.superb.mina.coder.SuperbProtocolCodecFactory; 21 import com.superb.mina.coder.SuperbProtocolDecoder; 22 import com.superb.mina.coder.SuperbProtocolEncoder; 23 import com.superb.mina.entity.BaseMsg; 24 import com.superb.mina.entity.Init; 25 26 27 public class ClientServer { 28 private static Logger logger=LoggerFactory.getLogger(ClientServer.class); 29 private static String HOST="192.168.1.115"; 30 private static int PORT=8003; 31 public static void main(String[] args) { 32 IoConnector connector=null; 33 connector=new NioSocketConnector(); 34 connector.setConnectTimeoutMillis(30000); 35 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SuperbProtocolCodecFactory())); 36 connector.setHandler(new ClientServerHandler()); 37 IoSession session=null; 38 try { 39 ConnectFuture future= connector.connect(new InetSocketAddress(HOST,PORT)); 40 future.awaitUninterruptibly(); 41 session=future.getSession(); 42 BaseMsg msg=new BaseMsg(); 43 Thread t=new Thread(); 44 int i=0; 45 while(true){ 46 i++; 47 Init init=new Init(); 48 init.setAc1(25); 49 init.setAc1FaltAlertEnable(1); 50 init.setcTime(new Date()); 51 byte[] bytes=init.getData(init); 52 msg.setSequence(i); 53 msg.setCommand(100); 54 msg.setData(bytes); 55 session.write(msg); 56 t.sleep(8000); 57 } 58 } catch (Exception e) { 59 logger.info("客户端连接异常....",e); 60 } 61 } 62 63 /** 64 * 尝试与服务器建立连接,如果连接成功返回IoSession 65 * @param ip 服务器IP地址 66 * @param port 服务器端口号 67 * @return 返回连接成功之后的IoSession 68 */ 69 public static IoSession connectToServer(String ip,int port){ 70 IoConnector connector=null; 71 connector=new NioSocketConnector(); 72 connector.setConnectTimeoutMillis(30000); 73 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SuperbProtocolCodecFactory())); 74 connector.setHandler(new ClientServerHandler()); 75 IoSession session=null; 76 try { 77 ConnectFuture future= connector.connect(new InetSocketAddress(ip,port)); 78 future.awaitUninterruptibly(); 79 session=future.getSession(); 80 logger.info("客户端连接成功"); 81 } catch (Exception e) { 82 logger.info("客户端连接异常....",e); 83 } 84 return session; 85 } 86 87 /** 88 * 发送心跳包 89 * @param session 与服务器连接成功之后的IoSession 90 * @param baseMsg 将要发送的心跳包 91 */ 92 public static void sendHeartBeat(IoSession session, BaseMsg baseMsg) { 93 logger.info("命令号为:"+baseMsg.getCommand()); 94 session.write(baseMsg); 95 } 96 }
1 package com.superb.mina.clientServer.handler; 2 3 import java.io.ByteArrayInputStream; 4 import java.io.ObjectInputStream; 5 import java.net.InetSocketAddress; 6 7 import org.apache.log4j.Logger; 8 import org.apache.mina.core.future.ConnectFuture; 9 import org.apache.mina.core.service.IoConnector; 10 import org.apache.mina.core.service.IoHandlerAdapter; 11 import org.apache.mina.core.session.IdleStatus; 12 import org.apache.mina.core.session.IoSession; 13 import org.apache.mina.filter.codec.ProtocolCodecFilter; 14 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; 15 import org.apache.mina.transport.socket.nio.NioSocketConnector; 16 17 import com.superb.mina.entity.BaseMsg; 18 import com.superb.mina.helper.Helper; 19 20 21 public class ClientServerHandler extends IoHandlerAdapter{ 22 private static Logger logger=Logger.getLogger(ClientServerHandler.class); 23 @Override 24 public void sessionCreated(IoSession session) throws Exception { 25 // TODO Auto-generated method stub 26 super.sessionCreated(session); 27 logger.info("创建session"); 28 } 29 @Override 30 public void sessionIdle(IoSession session, IdleStatus status) 31 throws Exception { 32 // TODO Auto-generated method stub 33 super.sessionIdle(session, status); 34 logger.info("session进入空闲状态"); 35 } 36 @Override 37 public void messageReceived(IoSession session, Object message) 38 throws Exception { 39 BaseMsg msg=(BaseMsg) message; 40 logger.info("客户端接收到服务器的消息为:"+msg); 41 logger.info("报文的长度为:"+msg.getLength()); 42 logger.info("报文序列号为:"+msg.getSequence()); 43 logger.info("报文命令号为:"+msg.getCommand()); 44 logger.info("报文保留字为:"+msg.getReserved()); 45 ByteArrayInputStream bi=new ByteArrayInputStream(msg.getData()); 46 ObjectInputStream oi=new ObjectInputStream(bi); 47 Object obj=oi.readObject(); 48 49 if(0x80000000==msg.getCommand()){ 50 //心跳应答包 51 }else if(0x80000001==msg.getCommand()){ 52 //初始化应答包 53 54 }else if(0x80000002==msg.getCommand()){ 55 //设备动作上报应答包 56 }else if(0x80000003==msg.getCommand()){ 57 //报警上报应答包 58 }else if(0x00008000==msg.getCommand()){ 59 //状态查询请求 60 }else if(0x00008001==msg.getCommand()){ 61 //设备控制请求 62 }else if(0x00008002==msg.getCommand()){ 63 以上是关于自定义协议传输的主要内容,如果未能解决你的问题,请参考以下文章