Mina自定义协议简单实现

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Mina自定义协议简单实现相关的知识,希望对你有一定的参考价值。

因公司需要做个电子秤自动称重系统,需要自定义协议实现,所以就用Mina简单实现了一下,因为比较简单,就直接上代码。有时间的话改成Netty版

服务端

package net.heartma.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.logging.Logger;
import net.heartma.protocol.CustomProtocolCodecFactory;
import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.SocketSessionConfig;
import org.apache.mina.transport.socket.nio.NiosocketAcceptor;

/**
 * 服务端
 * @author heartma
 *
 */
public class BalanceServer {
    private static SocketAcceptor acceptor;
    private static DefaultIoFilterChainBuilder filter;
    private static SocketSessionConfig config;
    public static Logger logger = Logger.getLogger("BalanceServer");
    private static Executor threadPool = Executors.newFixedThreadPool(20);    
    public static void main(String[] args) {
        // 1、创建服务器端监听
        acceptor = new NioSocketAcceptor();
        // 2、添加日志过滤和编码过滤
        filter = acceptor.getFilterChain();
        filter.addLast("threadPool", new ExecutorFilter(threadPool));
        filter.addLast("logger", new LoggingFilter());
        // ----编码过滤:将二进制或者协议相关数据转换成一个对象。TextLine工厂类可以处理基于文字的信息
        filter.addLast("codec", new ProtocolCodecFilter(new CustomProtocolCodecFactory()));
        // 3、绑定handler到acceptor
        acceptor.setHandler(new BalanceServerHandler());

        // 4、设置socket属性
        // 获取socket的连接参数
        config = acceptor.getSessionConfig();
        // 设置socket的缓冲区大小为2M
        config.setReadBufferSize(2048);
        
        /**
         * @params IdleStatus arg0 :在未成为idle状态前应该关心的状态(READ_IDLE或者WRITE_IDLE)
         * @params @params IdleStatus arg1 : 变成IDLE状态所需要的时间(超时时间)
         * 
         *         如果session持续idle的时间等于arg1时,将会触发handler中的sessionIdle方法
         */
        // 设置空闲状态持续时间:1、这里的状态可以自己设置成只为读取设置空闲状态持续时间,只为写入设置空闲状态等待时间,或者为两者都设置空闲状态等待时间。后面的时间是两次触发handler中的sessionIdel方法的间隔时间。
        config.setIdleTime(IdleStatus.BOTH_IDLE, 10);
        try {
            // 为服务器socket绑定端口
            acceptor.bind(new InetSocketAddress(8081));
            logger.info("服务已经启动... ...");
        } catch (IOException e) {
            logger.info("服务启动异常 :");
            e.printStackTrace();
        }        
    }
}

 服务端Handler

package net.heartma.server;
import net.heartma.pojo.Message;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

/**
 * 服务端Handler
 * @author heartma
 *
 */
public class BalanceServerHandler extends IoHandlerAdapter {

    @Override
    public void exceptionCaught(IoSession ioSession, Throwable e)
            throws Exception {
        System.out.println("exceptionCaught");

    }

    @Override
    public void messageReceived(IoSession ioSession, Object obj) throws Exception {
        System.out.println("messageReceived");
        Message message = (Message)obj;
        System.out.println("服务器端接收到的消息:" +message);
    }

    @Override
    public void messageSent(IoSession ioSession, Object obj) throws Exception {
        System.out.println("messageSent");
    }

    @Override
    public void sessionClosed(IoSession ioSession) throws Exception {
        System.out.println("sessionClosed");
    }

    @Override
    public void sessionCreated(IoSession ioSession) throws Exception {
        System.out.println("sessionCreated");
    }

    @Override
    public void sessionIdle(IoSession ioSession, IdleStatus arg1) throws Exception {
        System.out.println("sessionIdle");
    }

    @Override
    public void sessionOpened(IoSession ioSession) throws Exception {
        
        System.out.println("sessionOpened");
    }

}

 客户端

23230E363932383834363630333738370D303030302E3434 为定义的协议报文

 

package net.heartma.client;

import java.net.InetSocketAddress;
import java.net.SocketAddress;

import net.heartma.protocol.CustomProtocolCodecFactory;

import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

/**
 * Mina客户端
 * @author Administrator
 *
 */
public class BalanceClient {
	
	public static void main(String[] args) {
		//	创建客户端连接器 基于tcp/ip
		NioSocketConnector connector = new NioSocketConnector();
		
		//	连接的地址和端口
		SocketAddress address = new InetSocketAddress("localhost",8081);
		
		//	获取过滤器链
		DefaultIoFilterChainBuilder chain = connector.getFilterChain();
	
		//	配置日志过滤器和自定义编解码器
		chain.addLast("logger", new LoggingFilter());
		chain.addLast("mycodec",new ProtocolCodecFilter(new CustomProtocolCodecFactory()));
	
		//	添加处理器
		connector.setHandler(new BalanceClientHandler());
		
		// 连接到服务器 
		ConnectFuture future = connector.connect(address);
	
		//	等待连接创建完成
		future.awaitUninterruptibly();
		
		//	会话创建后发送消息到服务器
		future.getSession().write("23230E363932383834363630333738370D303030302E3434");
	
		//	等待28000毫秒后连接断开
		future.getSession().getCloseFuture().awaitUninterruptibly(28000);

		//	关闭连接
		connector.dispose();		
	}

}

 

客户端Handler

package net.heartma.client;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

/**
 * 客户端Handler
 * @author heartma
 *
 */
public class BalanceClientHandler extends IoHandlerAdapter {

    public void exceptionCaught(IoSession ioSession, Throwable e)
            throws Exception {
        System.out.println("exceptionCaught");
    }

    public void messageReceived(IoSession ioSession, Object obj) throws Exception {
        System.out.println("messageReceived");

    }

    public void messageSent(IoSession ioSession, Object obj) throws Exception {
        System.out.println("客户端发送消息...");
        //super.messageSent(ioSession, obj);
    }

    public void sessionClosed(IoSession ioSession) throws Exception {
        System.out.println("sessionClosed");

    }

    public void sessionCreated(IoSession ioSession) throws Exception {
        System.out.println("sessionCreated");

    }

    public void sessionIdle(IoSession ioSession, IdleStatus idle) throws Exception {

        System.out.println("sessionIdle");
    }

    public void sessionOpened(IoSession ioSession) throws Exception {
        System.out.println("sessionOpened");

    }

}

 实体消息体Message

package net.heartma.pojo;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;

/**
 * 定义消息体属性
 * @author heartma
 *
 */
public class Message {
    private String header;  //头
    private int length;    //卡号长度
    private String card;   //卡号 
    private double weight;  //重量
    public String getHeader() {
        return header;
    }
    public void setHeader(String header) {
        this.header = header;
    }
    public int getLength() {
        return length;
    }
    public void setLength(int length) {
        this.length = length;
    }
    public String getCard() {
        return card;
    }
    public void setCard(String card) {
        this.card = card;
    }
    public double getWeight() {
        return weight;
    }
    public void setWeight(double weight) {
        this.weight = weight;
    }
    
    /**
     * 解析字节数组
     * @param messageBytes
     */
    public final boolean ReadFromBytes(byte[] messageBytes) {
        //获取头部
        byte[] head = new byte[2];
        System.arraycopy(messageBytes, 0, head, 0,2);
        setHeader(new String(head));
        //获取长度
        byte[] len = new byte[1];
        System.arraycopy(messageBytes, 2, len, 0,1);
        setLength(len[0]);
        //判断卡号长度是否为0,为0则说明为心跳信息,否则为真实数据
        if((int)len[0]>0){
            //卡号解析
            byte[] cardDest = new byte[len[0]];
            System.arraycopy(messageBytes, 3, cardDest, 0,len[0]);
            setCard(new String(cardDest));
            byte[] weightDest = new byte[7];
            System.arraycopy(messageBytes, 3+length, weightDest, 0,7);
            setWeight(Double.parseDouble(new String(weightDest)));
            return true;
        }
        return false;
    }   
    
    @Override
    public String toString() {
        return "Message [header=" + header + ", length=" + length + ", card="
                + card + ", weight=" + weight + "]";
    }    
}

 自定义协议工厂类

package net.heartma.protocol;
import java.nio.charset.Charset;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;

/**
 * 自定义协议工厂类
 * @author heartma
 */
public class CustomProtocolCodecFactory implements ProtocolCodecFactory{
    private final CustomProtocolDecoder decoder;
    private final CustomProtocolEncoder encoder;
    public CustomProtocolCodecFactory(){
        this.decoder = new CustomProtocolDecoder(Charset.forName("utf-8"));
        this.encoder = new CustomProtocolEncoder(Charset.forName("utf-8"));        
    }
    @Override
    public ProtocolDecoder getDecoder(IoSession ioSession) throws Exception {
        return decoder;
    }

    @Override
    public ProtocolEncoder getEncoder(IoSession ioSession) throws Exception {
        return encoder;
    }
}

 自定义协议编码

package net.heartma.protocol;
import java.nio.charset.Charset;
import net.heartma.tools.ByteUtil;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;

/**
 * 自定义协议编码
 * @author heartma
 */
public class CustomProtocolEncoder extends ProtocolEncoderAdapter {
    private Charset charset;

    public CustomProtocolEncoder(Charset charset) {
        this.charset = charset;
    }

    @Override
    public void encode(IoSession session, Object message,
            ProtocolEncoderOutput out) throws Exception {
        byte[] hexStrToByte = ByteUtil.hexStr2ByteArray(message.toString());
        IoBuffer buf = IoBuffer.allocate(hexStrToByte.length).setAutoExpand(false);
        
        //byte[] content = Tools.HexString2Bytes(message.toString());
        buf.put(hexStrToByte);
        buf.flip();
        out.write(buf);
        out.flush();
        buf.free();
    }
}

 自定义协议解码

package net.heartma.protocol;

import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;

import net.heartma.pojo.Message;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

/**
 * 自定义协议解码
 * @author heartma
 */
public class CustomProtocolDecoder  extends ProtocolDecoderAdapter {
    private CharsetDecoder decoder;
    public CustomProtocolDecoder(Charset charset){
        this.decoder = charset.newDecoder();
    }
    @Override
    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)
            throws Exception {
        int limit = in.limit();
        byte[] bytes = new byte[limit];
        in.get(bytes);
        Message message = new Message();
        message.ReadFromBytes(bytes);
        out.write(message);
    }
}

 

 代码下载:

http://download.csdn.net/download/cos18661062156/10141901

 

技术分享图片

追求卓越,成功就会在不经意间追上你!

以上是关于Mina自定义协议简单实现的主要内容,如果未能解决你的问题,请参考以下文章

自定义协议传输

自定义对话框片段

MINA原理详解

使用mina 基于tcp协议实现客户端和服务端进行通信

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

Java TCP/IP Socket构建和解析自定义协议消息(含代码)