Apache Mina

Posted JY_He

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Mina相关的知识,希望对你有一定的参考价值。

初步接触RPC通信框架,目前有很多优秀的RPC框架,今天我参考该博文:http://www.cnblogs.com/xuekyo/archive/2013/03/06/2945826.html 学习了Aapche Mina通信框架。博主介绍的非常详细,包括Mina的源码流程,这里通过阅读博主的文章进行了学习记录,方便以后需要时使用。

Apache Mina 是一个网络应用程序框架,用来帮助用户简单地开发高性能和高可扩展性的网络应用程序。它提供了一个通过Java NIO在不同的传输例如TCP/IP和UDP/IP上抽象的事件驱动的异步API。

在Mina的源码,整个框架最核心的几个包是 :
•org.apache.mina.core.service :ioservice、IoProcessor、IoHandler、IoAcceptor、IoConnector
•org.apache.mina.core.session
•org.apache.mina.core.polling
•org.apache.mina.transport.socket

其中介绍Mina中的几个重要的接口:
•IoServiece :这个接口在一个线程上负责套接字的建立,拥有自己的 Selector,监听是否有连接被建立。
•IoProcessor :这个接口在另一个线程上负责检查是否有数据在通道上读写,也就是说它也拥有自己的 Selector,这是与我们使用 JAVA NIO 编码时的一个不同之处,通常在 JAVA NIO 编码中,我们都是使用一个 Selector,也就是不区分 IoService与 IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在 IoService 上的过滤器,并在过滤器链之后调用 IoHandler。
•IoAccepter :相当于网络应用程序中的服务器端
•IoConnector :相当于客户端
•IoSession :当前客户端到服务器端的一个连接实例
•IoHandler :这个接口负责编写业务逻辑,也就是接收、发送数据的地方。这也是实际开发过程中需要用户自己编写的部分代码。
•IoFilter :过滤器用于悬接通讯层接口与业务层接口,这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的 encode与 decode是最为重要的、也是你在使用 Mina时最主要关注的地方。




IoService 便是应用程序的入口,它是所有 IoAcceptor 和 IoConnector 的基接口:
1.底层的元数据信息 TransportMetadata,比如底层的网络服务提供者(NIO,ARP,RXTX等)。(除此方法之外,其它方法在 AbstractIoService 得以实现。)
2.通过这个服务创建一个新会话时,新会话的默认配置 IoSessionConfig。
3.此服务所管理的所有会话。
4.与这个服务相关所产生的事件所对应的监听者(IoServiceListener)。
5.处理这个服务所管理的所有连接的处理器(IoHandler)。
6.每个会话都有一个过滤器链(IoFilterChain),每个过滤器链通过其对应的 IoFilterChainBuilder来负责构建。
7.由于此服务管理了一系列会话,因此可以通过广播的方式向所有会话发送消息,返回结果是一个WriteFuture集,后者是一种表示未来预期结果的数据结构。
8.服务创建的会话(IoSession)相关的数据通过 IoSessionDataStructureFactory来提供。
9.发送消息时有一个写缓冲队列。
10.服务的闲置状态有三种:读端空闲,写端空闲,双端空闲。
11.还提供服务的一些统计信息,比如时间,数据量等。

IoAccepter 便是 IoService 的一个扩展接口。IoService 接口可以用来添加多个 IoFilter,这些 IoFilter 符合责任链模式 并由 IoProcessor 线程负责调用。而 IoAccepter 在 ioService 接口的基础上还提供绑定某个通讯端口以及取消绑定的接口。

IoAcceptor acceptor = new SocketAcceptor();
 

该代码相当于我们使用了 Socket 通讯方式作为服务的接入,当前版本的 MINA 还提供了除 SocketAccepter 外的基于数据报文通讯的DatagramAccepter 以及基于管道通讯的VmPipeAccepter。也可以自行实现IoService接口来使用自己的通讯方式。

 

图中最右端也就是 IoHandler,这便是业务处理模块。在业务处理类中不需要去关心实际的通讯细节,只管处理客户端传输过来的信息即可。编写 Handler 类就是使用 MINA 开发忘了应用程序的重心所 在,相当于 MINA 已经帮你处理了所有的通讯方面的细节问题。为了简化 Handler 类,MINA 提供了 IoHandlerAdapter 类,此类仅仅是实现了 IoHandler 接口,但并不做任何处理。

 

一个 IoHandler 接口中具有如下一些方法(摘自 MINA 的 API 文档):

•voidexceptionCaught(IoSession session, Throwable cause)

当接口中其他方法抛出异常未被捕获时触发此方法

•voidmessageReceived(IoSession session, Object message)

当接收到客户端的请求信息后触发此方法

•voidmessageSent(IoSession session, Object message)

当信息已经传送给客户端后触发此方法

•voidsessionClosed(IoSession session)

当连接被关闭时触发,例如客户端程序意外退出等等

•voidsessionCreated(IoSession session)

当一个新客户端连接后触发此方法

•voidsessionIdle(IoSession session, IdleStatus status)

当连接空闲时触发此方法

•voidsessionOpened(IoSession session)

当连接后打开时触发此方法,一般此方法与 sessionCreated 会被同时触发

IoService 是负责底层通讯接入,而 IoHandler 是负责业务处理的。那么 MINA 架构图中的 IoFilter 作何用途呢?答案是你想作何用途都可以。但是有一个用途却是必须的,那就是作为IoService 和IoHandler 之间的桥梁。 

 

IoHandler 接口中最重要的一个方法是messageReceived,这个方法的第二个参数是一个 Object 型的消息,总所周知,Object 是所有 Java 对象的基础,那到底谁来决定这消息到底是什么类型呢?答案也就在这个 IoFilter 中。在 前面使用的例子中,我们添加了一个 IoFilter 是 new ProtocolCodecFilter(newTextLineCodecFactory()),这个过滤器的作用是将来自客户端输入的信息转换成一行行的文本后传递给 IoHandler,因此我们可以在messageReceived 中直接将 msg 对象强制转换成 String 对象。

 

而如果我们不提供任何过滤器的话,那么在 messageReceived 方法中的第二个参数类型就是一个 byte 的缓冲区,对应的类是 org.apache.mina.core.buffer.IoBuffer。虽然可以将解析客户端信息放在IoHandler中来做,但这并不是推荐的做法,使原来清晰的模型又模糊起来,变得IoHandler不只是业务处理,还得充当协议解析的任务。

 

MINA自身带有一些常用的过滤器,例如LoggingFilter(日志记录)、BlackListFilter(黑名单过滤)、CompressionFilter(压缩)、SSLFilter(SSL加密)等。

 

简单地来讲,就分为三层:

1.I/O Service :负责处理I/O;

2.I/O FilterChain :负责编码处理,字节到数据结构或数据结构到字节的转换等,即非业务逻辑的操作;

3.I/O Handle :负责处理业务逻辑。

 

客户端的通信过程:

 

1.通过SocketConnector同服务器端建立连接,先是由Acceptor接收到请求连接的事件(即ACCEPT事件),并将此连接和一个I/O Processor关联,连接 分配器会均衡的将Socket和不同的I/O Processor绑定(轮流分配);

2.链接建立和绑定完成之后,会在I/OProcessor上进行读写事件的监听,当有读写事件发生时,就会通知到对应的Processor进行数据处理。I/O的读写交给了I/O Processor线程,I/O Processor是多线程的;

3.通过I/O Processor读取的数据经过IoFilterChain里所有配置的IoFilter,IoFilter进行消息的过滤,格式的转换,在这个层面可以制定一些自定义的协议;

4.最后IoFilter将数据交给Handler进行业务处理,完成了整个读取的过程;

5.写入过程也是类似,只是刚好倒过来,通过IoSession.write写出数据,然后Handler进行写入的业务处理,处理完成后交给IoFilterChain,进行消息过滤和协议的转换,最后通过I/O Processor将数据写出到socket通道。

 

IoFilterChain作为消息过滤链

客户端通信过程   IoSession贯穿整个通信过程的始终

下面使用Mina框架实现简单的客户端及服务端通信:

创建服务器:

创建IoAccptor对象->设置过滤器(日志,编码等)->设置IoHandler->配置->绑定到指定IP和端口

package mina.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import mina.server.handler.TimeServerHandler;

import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class MinaTimeServer {
	
	//定义监听端口号
	private static int PORT=6488;
	
	public static void main(String[] args) throws IOException {
		//创建服务端监控线程,使用了 Socket 通讯方式作为服务的接入
		IoAcceptor acceptor=new NioSocketAcceptor();
		//IoSessionConfig表示会话的配置信息,主要包括:读缓冲区大小,会话数据吞吐量,计算吞吐量的时间间隔,指定会话段的空闲时间,写请求操作超时时间等
		acceptor.getSessionConfig().setReadBufferSize(2048);
		acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
		//设置日志记录器
		acceptor.getFilterChain().addLast("logger", new LoggingFilter());
		//设置编码过滤器
		acceptor.getFilterChain().addLast("code", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
		//设置业务逻辑处理器
		acceptor.setHandler(new TimeServerHandler());
		//设置端口号
		acceptor.bind(new InetSocketAddress(PORT));
		//启动监听线程
		acceptor.bind();
	}

}
package mina.server.handler;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;

/**
 * @author heyongjian
 *  服务端业务逻辑
 */
public class TimeServerHandler extends IoHandlerAdapter{
	//连接创建事件
	@Override
	public void sessionCreated(IoSession session) throws Exception {
		//显示客户端ip和端口号
		System.out.println(session.getRemoteAddress().toString());
	}
	
	@Override
	public void exceptionCaught(IoSession session, Throwable cause)
			throws Exception {
		cause.printStackTrace();
	}
	//消息接收事件
	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {
		String strMsg=message.toString();
		if(strMsg.trim().equalsIgnoreCase("quit")){
			session.close(true);
			return;
		}
		//返回消息字符串
		session.write("Hi,client");
		// 打印客户端发送的消息
		System.out.println("Message written:"+strMsg);
	}

	@Override
	public void sessionIdle(IoSession session, IdleStatus status)
			throws Exception {
		System.out.println("IDLE"+session.getIdleCount(status));
	}
	
}

创建客户端:

创建NioSocketConnector对象->设置过滤器(日志,编码等)->设置IoHandler->配置->ConnectFuture连接到指定IP和端口

package mina.client;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

import mina.client.handler.TimeClientHandler;

import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketConnector;

public class MinaTimeClient {

	public static void main(String[] args) {
		//创建客户端连接器
		NioSocketConnector connector=new NioSocketConnector();
		connector.getFilterChain().addLast("logger", new LoggingFilter());
		connector.getFilterChain().addLast("code", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("Utf-8"))));
		
		//设置超时检查时间
		connector.setConnectTimeoutCheckInterval(30);
		//设置业务逻辑处理器
		connector.setHandler(new TimeClientHandler());
		
		//建立连接
		ConnectFuture connectFuture=connector.connect(new InetSocketAddress("10.16.38.26",6488));
		//等待连接创建完成
		connectFuture.awaitUninterruptibly();
		
		connectFuture.getSession().write("Hi Server");
		connectFuture.getSession().write("quit");
		
		//等待连接断开
		connectFuture.getSession().getCloseFuture().awaitUninterruptibly();
		//释放连接
		connector.dispose();
		
	}

}
package mina.client.handler;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;

//编写自己的业务逻辑类
public class TimeClientHandler extends IoHandlerAdapter{

	//接收消息
	@Override
	public void messageReceived(IoSession session, Object message)
			throws Exception {
		String content=message.toString();
		System.out.println("client received a message :"+content);
	}
	//发送消息
	@Override
	public void messageSent(IoSession session, Object message) throws Exception {
		System.out.println("sending message ->:"+message);
	}
	
}

关于Mina源码可以参考博主的第二篇文章,写的非常详细:http://www.cnblogs.com/xuekyo/archive/2013/03/08/2950644.html

上述若有不对,欢迎拍砖指出!

以上是关于Apache Mina的主要内容,如果未能解决你的问题,请参考以下文章

java socket 多线程通讯 使用mina作为服务端

springboot整合mina

Mina的客户端

Apache Mina

mina

Apache Mina:一个简单的tcp通信demo